Reading shards from a Kinesis stream
Here is part of my implementation of reading a Kinesis stream. I am not confident that this is the best way to implement synchronizing returns from goroutine.
Assuming there are N shards in the stream, the code spawns goroutines for each shard in the stream for processing. Goroutine-N works indefinitely until the underlying context is canceled or until an error is encountered during processing.
The code below is supposed to wait for all N routines to terminate successfully (i.e without any of them writing to errc
) or wait for at least one routine to write to errc
(return an error).
If all routines terminate without writing to errc
, the WaitGroup sync lock is released, closing the errc
and done
channels. Which would, in turn, resume the current thread. However, if one of the routines terminate writing to errc
, we force all routines to terminate by calling the underlying context's cancel function and wait for done
channel to close.
func (c *Consumer) scanShards(col Collector, streamName string, ids string) error {
checkpoints, err := col.GetSequenceNumberForShards(streamName, ids)
if err != nil {
return fmt.Errorf("error retrieving stored checkpoints for shards: %v", err)
}
errc := make(chan error, 1)
done := make(chan error, 1)
wg := sync.WaitGroup{}
wg.Add(len(ids))
for _, id := range ids {
seqNum := checkpoints[id]
ctx, cancel := context.WithCancel(context.Background())
c.cancelFuncs = append(c.cancelFuncs, cancel)
go func(ctx context.Context, shardID, startSeqNum string) {
defer wg.Done()
if err := c.scanShard(ctx, shardID, startSeqNum); err != nil {
errc <- fmt.Errorf("error in shard %q: %v", shardID, err)
}
}(ctx, id, seqNum)
}
go func() {
wg.Wait()
close(errc)
close(done)
}()
err = <-errc
if err != nil {
// Cancel all scans, to release the worker goroutines from
// the pool.
for _, cancel := range c.cancelFuncs {
cancel()
}
}
<-done // Wait for all goroutines to exit.
return err
}
go concurrency
add a comment |
Here is part of my implementation of reading a Kinesis stream. I am not confident that this is the best way to implement synchronizing returns from goroutine.
Assuming there are N shards in the stream, the code spawns goroutines for each shard in the stream for processing. Goroutine-N works indefinitely until the underlying context is canceled or until an error is encountered during processing.
The code below is supposed to wait for all N routines to terminate successfully (i.e without any of them writing to errc
) or wait for at least one routine to write to errc
(return an error).
If all routines terminate without writing to errc
, the WaitGroup sync lock is released, closing the errc
and done
channels. Which would, in turn, resume the current thread. However, if one of the routines terminate writing to errc
, we force all routines to terminate by calling the underlying context's cancel function and wait for done
channel to close.
func (c *Consumer) scanShards(col Collector, streamName string, ids string) error {
checkpoints, err := col.GetSequenceNumberForShards(streamName, ids)
if err != nil {
return fmt.Errorf("error retrieving stored checkpoints for shards: %v", err)
}
errc := make(chan error, 1)
done := make(chan error, 1)
wg := sync.WaitGroup{}
wg.Add(len(ids))
for _, id := range ids {
seqNum := checkpoints[id]
ctx, cancel := context.WithCancel(context.Background())
c.cancelFuncs = append(c.cancelFuncs, cancel)
go func(ctx context.Context, shardID, startSeqNum string) {
defer wg.Done()
if err := c.scanShard(ctx, shardID, startSeqNum); err != nil {
errc <- fmt.Errorf("error in shard %q: %v", shardID, err)
}
}(ctx, id, seqNum)
}
go func() {
wg.Wait()
close(errc)
close(done)
}()
err = <-errc
if err != nil {
// Cancel all scans, to release the worker goroutines from
// the pool.
for _, cancel := range c.cancelFuncs {
cancel()
}
}
<-done // Wait for all goroutines to exit.
return err
}
go concurrency
add a comment |
Here is part of my implementation of reading a Kinesis stream. I am not confident that this is the best way to implement synchronizing returns from goroutine.
Assuming there are N shards in the stream, the code spawns goroutines for each shard in the stream for processing. Goroutine-N works indefinitely until the underlying context is canceled or until an error is encountered during processing.
The code below is supposed to wait for all N routines to terminate successfully (i.e without any of them writing to errc
) or wait for at least one routine to write to errc
(return an error).
If all routines terminate without writing to errc
, the WaitGroup sync lock is released, closing the errc
and done
channels. Which would, in turn, resume the current thread. However, if one of the routines terminate writing to errc
, we force all routines to terminate by calling the underlying context's cancel function and wait for done
channel to close.
func (c *Consumer) scanShards(col Collector, streamName string, ids string) error {
checkpoints, err := col.GetSequenceNumberForShards(streamName, ids)
if err != nil {
return fmt.Errorf("error retrieving stored checkpoints for shards: %v", err)
}
errc := make(chan error, 1)
done := make(chan error, 1)
wg := sync.WaitGroup{}
wg.Add(len(ids))
for _, id := range ids {
seqNum := checkpoints[id]
ctx, cancel := context.WithCancel(context.Background())
c.cancelFuncs = append(c.cancelFuncs, cancel)
go func(ctx context.Context, shardID, startSeqNum string) {
defer wg.Done()
if err := c.scanShard(ctx, shardID, startSeqNum); err != nil {
errc <- fmt.Errorf("error in shard %q: %v", shardID, err)
}
}(ctx, id, seqNum)
}
go func() {
wg.Wait()
close(errc)
close(done)
}()
err = <-errc
if err != nil {
// Cancel all scans, to release the worker goroutines from
// the pool.
for _, cancel := range c.cancelFuncs {
cancel()
}
}
<-done // Wait for all goroutines to exit.
return err
}
go concurrency
Here is part of my implementation of reading a Kinesis stream. I am not confident that this is the best way to implement synchronizing returns from goroutine.
Assuming there are N shards in the stream, the code spawns goroutines for each shard in the stream for processing. Goroutine-N works indefinitely until the underlying context is canceled or until an error is encountered during processing.
The code below is supposed to wait for all N routines to terminate successfully (i.e without any of them writing to errc
) or wait for at least one routine to write to errc
(return an error).
If all routines terminate without writing to errc
, the WaitGroup sync lock is released, closing the errc
and done
channels. Which would, in turn, resume the current thread. However, if one of the routines terminate writing to errc
, we force all routines to terminate by calling the underlying context's cancel function and wait for done
channel to close.
func (c *Consumer) scanShards(col Collector, streamName string, ids string) error {
checkpoints, err := col.GetSequenceNumberForShards(streamName, ids)
if err != nil {
return fmt.Errorf("error retrieving stored checkpoints for shards: %v", err)
}
errc := make(chan error, 1)
done := make(chan error, 1)
wg := sync.WaitGroup{}
wg.Add(len(ids))
for _, id := range ids {
seqNum := checkpoints[id]
ctx, cancel := context.WithCancel(context.Background())
c.cancelFuncs = append(c.cancelFuncs, cancel)
go func(ctx context.Context, shardID, startSeqNum string) {
defer wg.Done()
if err := c.scanShard(ctx, shardID, startSeqNum); err != nil {
errc <- fmt.Errorf("error in shard %q: %v", shardID, err)
}
}(ctx, id, seqNum)
}
go func() {
wg.Wait()
close(errc)
close(done)
}()
err = <-errc
if err != nil {
// Cancel all scans, to release the worker goroutines from
// the pool.
for _, cancel := range c.cancelFuncs {
cancel()
}
}
<-done // Wait for all goroutines to exit.
return err
}
go concurrency
go concurrency
edited yesterday
200_success
128k15152413
128k15152413
asked Jan 1 at 18:06
StatesStates
1111
1111
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
Alright, there's a number of issues with the code you posted here. I'll go through it all, and point out issues, step by step:
func (c *Consumer) scanShards(col Collector, streamName string, ids string) error {
Alright, so we have a *Consumer
(exported type), with a non-exported scanShards
function. That's fine. The thing I'm slightly baffled by is why the Collector
is passed as an argument here. In AWS Kinesis terms, a Collector
is what you use to send stuff to Kinesis in a single request. To me, it's sort of like a buffered writer. This collector, however, seems to provide the consumer with data about the shards that you're about to consume. The collector is actually a core dependency of your consumer, it seems. It'd make sense for the consumer to have the underlying collector assigned to a field:
type Consumer struct {
collector Collector // Rename needed, though
cancelFuncs context.CancelFunc // more on this later!
}
Because you're dealing with streams, I'd strongly recommend you use the context package, too (I know you are, but you're not getting the most out of it - see below). Convention dictates that the context argument comes first, so the function would now look like this:
func (c *Consumer) scanShards(ctx context.Context, streamName string, ids string) error {
The arguments could probably do with some renaming, but I'm probably the worst person to suggest a good variable name. Anyway, onwards:
checkpoints, err := col.GetSequenceNumberForShards(streamName, ids)
if err != nil {
return fmt.Errorf("error retrieving stored checkpoints for shards: %v", err)
}
Right, so the "collector" has an exported function getting sequence, and you're expecting the collector to be passed in as an argument. This is kind of weird. It'd make much more sense for someone to use a Consumer
object and call something like consumer.GetShards()
, because that actually tells me something: I know that I'm working with a consumer that is taking its data from a shards X, Y, and Z.
I've looked at what comes next, but I'll be returning to the checkpoints
variable later on. First let me just recommend you look at this package for errors. You're essentially returning a specific error (which is OK), but it's a hard one to detect on the caller-side. The string value of the error still contains all of the raw/initial error value. With the package I linked to, you'd be able to write the same like so:
return errors.Wrap(err, "error retrieving stored checkpoints for shards")
Replace the string constant with an exported constant like so
const ErrRetrievingCheckpoints = "error retrievign stroed checkpoints for shards")
// and use:
return errors.Wrap(err, ErrRetrievingCheckpoints)
And you'll have a much easier time actually detecting what error was returned, without losing the underlying error details.
OK, let's move on to the channel stuff now:
errc := make(chan error, 1)
done := make(chan error, 1)
wg := sync.WaitGroup{}
wg.Add(len(ids))
Right, So you're creating a waitgroup, and add the length of ids to it, so there's going to be 1 routine per ID. Fine. Then you create an error and done channel, both with a buffer of 1. I'll explain how in a bit, but you can (and should) get rid of the done
channel entirely. Having said that, channels like done
are generally defined as done := make(chan struct{})
: no buffer needed, and an empty struct is defined in the golang specs as a 0-byte type.
The error channel, however, has a bit of a problem: Suppose I'm going to start 10 routines, and I've got 3 errors. The first routine to err will write its error value onto the channel, and be done with it. The second and third routines to fail will be blocked, waiting for you to read the error from the channel. You have the code to do this, so you'll get that error, and the second routine will now write a new error value to the channel. The third routine is still blocked!
What your code does next, is cancel all routines (sort of), and wait for the wg.Wait()
call to return. Because the third routine is blocked, this will never happen. You've got a deadlock on your hands. The done
channel won't get closed, and thus this function won't return, all because your error channel is blocking routines from completing. That's not what we want. There's a quick (and not always ideal) solution to this problem: increase the channel buffer:
errCh := make(chan error, len(ids) -1) // we'll always read 1, hence -1 is big enough
Of course, we're assuming ids
is not empty. Just in case, it's always a good idea to add some basic checks like if len(ids) == 0
and return early.
Still, I'll explain later how we fix this without having to increase the channel buffers too much. For now, let's carry on with the line-by-line review:
for _, id := range ids {
seqNum := checkpoints[id]
Yes, I have an issue with this already. checkpoints
clearly is a var of type map[string]string
. I know this because ids
is string
, and the seqNum
variable is passed to a goroutine with the type string
. Why, then, iterate over a slice, and then perform a lookup in a map to get the corresponding value? Why not simply write:
for id, seqNum := range checkpoints {
That's a lot cleaner, and reduces the risk of someone else looking at your code and adding something like this:
seqNum, ok := checkpoints[id]
if !ok {
return errors.New("checkpoint missing")
}
Nevermind the risk of someone changing the GetSequenceNumberForShards
function to not return an error if not all id's were found! Suppose your ids
var holds {"foo", "bar", "zar", "car"}
, but the checkpoints
map only was returned without the zar
key...
For the same reason, you really should replace wg.Add(len(ids))
with wg.Add(len(checkpoints))
Right, onwards, let's get stuck in with the context:
ctx, cancel := context.WithCancel(context.Background())
c.cancelFuncs = append(c.cancelFuncs, cancel)
Why? Why on earth? You're creating a cancel context for each routine, and append the cancel func to a slice without a way to retrieve a specific cancel function. The slice is also stored on the Consumer
type. The function uses a pointer receiver, so there's a genuine risk of race conditions here! Suppose I do this:
go consumer.scanShards(col1, "foo", string{"a", "b", "c"})
go consumer.scanShards(col2, "bar", string{"foo", "bar", "car", "zar"})
Both routines are appending to the same slice of cancel functions. They're both calling all of them blindly, too. These routines are messing things up for eachother. And that's just one example, imagine someone coming along and writing this:
func (c *Consumer) Reset() {
// restoring some fields
c.cancelFuncs = context.CancelFunc{}
// more fields here
}
These kind of things can be really hard to debug, and your code is very vulnerable to these issues. You may think that the Reset
func I've written here is an unlikely scenario, but given that there's nothing in your code that actually removes the cancel
values from the slice, your slice will grow, and grow, and grow, and things will get slowed down.
OK, I had to rant about these things somewhat, but remember how I said that you ought to have your first argument be a context? Right, do it, it allows the caller to pass in a context.WithCancel
, or context.WithTimeout
, for example, so the caller can determine whether they want to/need to wait on your function to return. Secondly: why not have all your routines share the same context?. You're just re-wrapping the Background
context anyway. Instead of doing that, I'd simply wrap the context from the argument once (outside the loop):
rctx, cfunc := context.WithCancel(ctx)
If the context passed as an argument gets cancelled, the cancellation will propagate, if you want to cancel the context, call cfunc
, and all your routines will receive the cancellation signal (<-ctx.Done()
). The caller is unaffected.
So with this in mind, let's rewrite the loop a bit (more improvements to follow below, but what we have thusfar):
func (c *Consumer) scanShards(ctx context.Context, stream string, ids string) error {
if len(ids) == 0 {
return ErrNoIdsProvided
}
checkpoints, err := c.collector.GetSequenceNumberForShards(stream, ids)
if err != nil {
return errors.Wrap(err, ErrRetrievingCheckpoints)
}
if len(checkpoints) == 0 {
return ErrNothingToCheck // something like this, should be handled properly
}
// note ids is irrelevant, checkpoints is the source of truth now
errCh := make(chan error, len(checkpoints) - 1)
// we'll get rid of this, but I've not explained how, so it's here still:
done := make(chan struct{})
wg := sync.WaitGroup{}
wg.Add(len(checkpoints))
// wrap around ctx argument once!
rctx, cfunc := context.WithCancel(ctx)
for id, seqNum := range checkpoints {
go func(ctx context.Context, shardID, startSeqNum string) {
defer wg.Done()
if err := c.scanShard(ctx, shardID, startSeqNum); err != nil {
errc <- fmt.Errorf("error in shard %q: %v", shardID, err)
}
}(rctx, id, seqNum)
}
OK, that's where we are now. Because the rctx
is not in the for
scope, and is shared by all routines, you can simply write the routine like so:
go func(id, seqNum string) {
defer wg.Done()
// note: rctx, not ctx!
if err := c.scanShard(rctx, id, seqNum); err != nil {
errCh <- err
}
}(id, seqNum)
The routine centers around c.scanShard
, a function you've not posted, and thus I have no idea what it does (how it uses the context etc...). What is clear to me, though, is that it's an unexported function, and you have control over it. You could just as easily make it into a function that behaves exactly like the routine you wrapped the call in, so you can replace the go func()
bit simply with this:
go c.scanShard(rctx, &wg, id, seqNum, errCh)
// given scanShard looks like this:
func (c *Consumer) scanShard(ctx context.Context, wg *sync.WaitGroup, id, seqNum string, errCh chan<- error)
Anyway, the rest of your code is about tying things together:
go func() {
wg.Wait()
close(errc)
close(done)
}()
So waiting for the routines to return before closing the channels, which is fair enough, seeing as writing to a closed channel is bad. Closing the done
channel here, though is what actually allows your function to return. But does it add any value?
err = <-errc
if err != nil {
for _, cancel := range c.cancelFuncs {
cancel()
}
}
<-done // Wait for all goroutines to exit.
return err
}
And this is where things get realy messy: you're first checking the error channel (fair enough), but because you're not sure whether or not you've passed the blocking read (err = <-errCh
) because the channel was closed or not, you have to check whether or not you actually have an error. If not, you're still checking the done
channel, completely pointlessly. Even with your code as it stands, the done
channel is completely redundant, the error channel already does the same thing.
Get rid of done
, even with your current code, it serves no purpose whatsoever.
Alternative approach
I've been ranting plenty for now, and I think this is a decent starting point. When I started looking at your code, my mind did jump to select
statements, and using the ctx.Done()
to stop calling the scanShard
function. I'm not sure if it's actually the better way for this particular case, but it might be worth considering. I'll just include a quick write-up of how I'd write this function using ctx.Done
and select
to control the flow. It's a bit more verbose, and actually looks more complex than it needs to be IMHO, but it's just to show an alternative approach, that in some cases might be worth considering. I have to say, all code in the answer is untested, and just written as I went along, so typo's and bugs are possible:
package main
import (
"context"
"fmt"
"sync"
)
func (c *Consumer) scanShards(ctx context.Context, streamName string, ids string) error {
checkpoints, err := c.collector.GetSequenceNumberForShards(streamName, ids)
if err != nil {
// zerolog
c.logger.Error().Err(err).Msg("Error retrieving stored checkpoints for shards")
return err
}
errc := make(chan error, 1)
defer close(errc)
wg := sync.WaitGroup{}
wg.Add(len(checkpoints))
rctx, cfunc := context.WithCancel(ctx)
for id, seqNum := range checkpoints {
go func(shardID, startSeqNum string) {
defer wg.Done()
select {
case <-rctx.Done():
// context cancelled, no need to do anything
return
default:
if err := c.scanShard(rctx, shardID, startSeqNum); err != nil {
c.logger.Error().Err(err).Msg("Error in shard %q", shardID)
errc <- fmt.Errorf("error in shard %q: %v", shardID, err)
}
}
}(id, seqNum)
}
go func() {
// wg cleared -> cancel context
wg.Wait()
cfunc()
}()
select {
case <-ctx.Done():
// ctx was closed after waitgroup, so nothing to do here
return nil
case err := <-errc:
// error -> cancel routines
cfunc()
return err // return, this will close channel
}
}
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
return StackExchange.using("mathjaxEditing", function () {
StackExchange.MarkdownEditor.creationCallbacks.add(function (editor, postfix) {
StackExchange.mathjaxEditing.prepareWmdForMathJax(editor, postfix, [["\$", "\$"]]);
});
});
}, "mathjax-editing");
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "196"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: false,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: null,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f210698%2freading-shards-from-a-kinesis-stream%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
Alright, there's a number of issues with the code you posted here. I'll go through it all, and point out issues, step by step:
func (c *Consumer) scanShards(col Collector, streamName string, ids string) error {
Alright, so we have a *Consumer
(exported type), with a non-exported scanShards
function. That's fine. The thing I'm slightly baffled by is why the Collector
is passed as an argument here. In AWS Kinesis terms, a Collector
is what you use to send stuff to Kinesis in a single request. To me, it's sort of like a buffered writer. This collector, however, seems to provide the consumer with data about the shards that you're about to consume. The collector is actually a core dependency of your consumer, it seems. It'd make sense for the consumer to have the underlying collector assigned to a field:
type Consumer struct {
collector Collector // Rename needed, though
cancelFuncs context.CancelFunc // more on this later!
}
Because you're dealing with streams, I'd strongly recommend you use the context package, too (I know you are, but you're not getting the most out of it - see below). Convention dictates that the context argument comes first, so the function would now look like this:
func (c *Consumer) scanShards(ctx context.Context, streamName string, ids string) error {
The arguments could probably do with some renaming, but I'm probably the worst person to suggest a good variable name. Anyway, onwards:
checkpoints, err := col.GetSequenceNumberForShards(streamName, ids)
if err != nil {
return fmt.Errorf("error retrieving stored checkpoints for shards: %v", err)
}
Right, so the "collector" has an exported function getting sequence, and you're expecting the collector to be passed in as an argument. This is kind of weird. It'd make much more sense for someone to use a Consumer
object and call something like consumer.GetShards()
, because that actually tells me something: I know that I'm working with a consumer that is taking its data from a shards X, Y, and Z.
I've looked at what comes next, but I'll be returning to the checkpoints
variable later on. First let me just recommend you look at this package for errors. You're essentially returning a specific error (which is OK), but it's a hard one to detect on the caller-side. The string value of the error still contains all of the raw/initial error value. With the package I linked to, you'd be able to write the same like so:
return errors.Wrap(err, "error retrieving stored checkpoints for shards")
Replace the string constant with an exported constant like so
const ErrRetrievingCheckpoints = "error retrievign stroed checkpoints for shards")
// and use:
return errors.Wrap(err, ErrRetrievingCheckpoints)
And you'll have a much easier time actually detecting what error was returned, without losing the underlying error details.
OK, let's move on to the channel stuff now:
errc := make(chan error, 1)
done := make(chan error, 1)
wg := sync.WaitGroup{}
wg.Add(len(ids))
Right, So you're creating a waitgroup, and add the length of ids to it, so there's going to be 1 routine per ID. Fine. Then you create an error and done channel, both with a buffer of 1. I'll explain how in a bit, but you can (and should) get rid of the done
channel entirely. Having said that, channels like done
are generally defined as done := make(chan struct{})
: no buffer needed, and an empty struct is defined in the golang specs as a 0-byte type.
The error channel, however, has a bit of a problem: Suppose I'm going to start 10 routines, and I've got 3 errors. The first routine to err will write its error value onto the channel, and be done with it. The second and third routines to fail will be blocked, waiting for you to read the error from the channel. You have the code to do this, so you'll get that error, and the second routine will now write a new error value to the channel. The third routine is still blocked!
What your code does next, is cancel all routines (sort of), and wait for the wg.Wait()
call to return. Because the third routine is blocked, this will never happen. You've got a deadlock on your hands. The done
channel won't get closed, and thus this function won't return, all because your error channel is blocking routines from completing. That's not what we want. There's a quick (and not always ideal) solution to this problem: increase the channel buffer:
errCh := make(chan error, len(ids) -1) // we'll always read 1, hence -1 is big enough
Of course, we're assuming ids
is not empty. Just in case, it's always a good idea to add some basic checks like if len(ids) == 0
and return early.
Still, I'll explain later how we fix this without having to increase the channel buffers too much. For now, let's carry on with the line-by-line review:
for _, id := range ids {
seqNum := checkpoints[id]
Yes, I have an issue with this already. checkpoints
clearly is a var of type map[string]string
. I know this because ids
is string
, and the seqNum
variable is passed to a goroutine with the type string
. Why, then, iterate over a slice, and then perform a lookup in a map to get the corresponding value? Why not simply write:
for id, seqNum := range checkpoints {
That's a lot cleaner, and reduces the risk of someone else looking at your code and adding something like this:
seqNum, ok := checkpoints[id]
if !ok {
return errors.New("checkpoint missing")
}
Nevermind the risk of someone changing the GetSequenceNumberForShards
function to not return an error if not all id's were found! Suppose your ids
var holds {"foo", "bar", "zar", "car"}
, but the checkpoints
map only was returned without the zar
key...
For the same reason, you really should replace wg.Add(len(ids))
with wg.Add(len(checkpoints))
Right, onwards, let's get stuck in with the context:
ctx, cancel := context.WithCancel(context.Background())
c.cancelFuncs = append(c.cancelFuncs, cancel)
Why? Why on earth? You're creating a cancel context for each routine, and append the cancel func to a slice without a way to retrieve a specific cancel function. The slice is also stored on the Consumer
type. The function uses a pointer receiver, so there's a genuine risk of race conditions here! Suppose I do this:
go consumer.scanShards(col1, "foo", string{"a", "b", "c"})
go consumer.scanShards(col2, "bar", string{"foo", "bar", "car", "zar"})
Both routines are appending to the same slice of cancel functions. They're both calling all of them blindly, too. These routines are messing things up for eachother. And that's just one example, imagine someone coming along and writing this:
func (c *Consumer) Reset() {
// restoring some fields
c.cancelFuncs = context.CancelFunc{}
// more fields here
}
These kind of things can be really hard to debug, and your code is very vulnerable to these issues. You may think that the Reset
func I've written here is an unlikely scenario, but given that there's nothing in your code that actually removes the cancel
values from the slice, your slice will grow, and grow, and grow, and things will get slowed down.
OK, I had to rant about these things somewhat, but remember how I said that you ought to have your first argument be a context? Right, do it, it allows the caller to pass in a context.WithCancel
, or context.WithTimeout
, for example, so the caller can determine whether they want to/need to wait on your function to return. Secondly: why not have all your routines share the same context?. You're just re-wrapping the Background
context anyway. Instead of doing that, I'd simply wrap the context from the argument once (outside the loop):
rctx, cfunc := context.WithCancel(ctx)
If the context passed as an argument gets cancelled, the cancellation will propagate, if you want to cancel the context, call cfunc
, and all your routines will receive the cancellation signal (<-ctx.Done()
). The caller is unaffected.
So with this in mind, let's rewrite the loop a bit (more improvements to follow below, but what we have thusfar):
func (c *Consumer) scanShards(ctx context.Context, stream string, ids string) error {
if len(ids) == 0 {
return ErrNoIdsProvided
}
checkpoints, err := c.collector.GetSequenceNumberForShards(stream, ids)
if err != nil {
return errors.Wrap(err, ErrRetrievingCheckpoints)
}
if len(checkpoints) == 0 {
return ErrNothingToCheck // something like this, should be handled properly
}
// note ids is irrelevant, checkpoints is the source of truth now
errCh := make(chan error, len(checkpoints) - 1)
// we'll get rid of this, but I've not explained how, so it's here still:
done := make(chan struct{})
wg := sync.WaitGroup{}
wg.Add(len(checkpoints))
// wrap around ctx argument once!
rctx, cfunc := context.WithCancel(ctx)
for id, seqNum := range checkpoints {
go func(ctx context.Context, shardID, startSeqNum string) {
defer wg.Done()
if err := c.scanShard(ctx, shardID, startSeqNum); err != nil {
errc <- fmt.Errorf("error in shard %q: %v", shardID, err)
}
}(rctx, id, seqNum)
}
OK, that's where we are now. Because the rctx
is not in the for
scope, and is shared by all routines, you can simply write the routine like so:
go func(id, seqNum string) {
defer wg.Done()
// note: rctx, not ctx!
if err := c.scanShard(rctx, id, seqNum); err != nil {
errCh <- err
}
}(id, seqNum)
The routine centers around c.scanShard
, a function you've not posted, and thus I have no idea what it does (how it uses the context etc...). What is clear to me, though, is that it's an unexported function, and you have control over it. You could just as easily make it into a function that behaves exactly like the routine you wrapped the call in, so you can replace the go func()
bit simply with this:
go c.scanShard(rctx, &wg, id, seqNum, errCh)
// given scanShard looks like this:
func (c *Consumer) scanShard(ctx context.Context, wg *sync.WaitGroup, id, seqNum string, errCh chan<- error)
Anyway, the rest of your code is about tying things together:
go func() {
wg.Wait()
close(errc)
close(done)
}()
So waiting for the routines to return before closing the channels, which is fair enough, seeing as writing to a closed channel is bad. Closing the done
channel here, though is what actually allows your function to return. But does it add any value?
err = <-errc
if err != nil {
for _, cancel := range c.cancelFuncs {
cancel()
}
}
<-done // Wait for all goroutines to exit.
return err
}
And this is where things get realy messy: you're first checking the error channel (fair enough), but because you're not sure whether or not you've passed the blocking read (err = <-errCh
) because the channel was closed or not, you have to check whether or not you actually have an error. If not, you're still checking the done
channel, completely pointlessly. Even with your code as it stands, the done
channel is completely redundant, the error channel already does the same thing.
Get rid of done
, even with your current code, it serves no purpose whatsoever.
Alternative approach
I've been ranting plenty for now, and I think this is a decent starting point. When I started looking at your code, my mind did jump to select
statements, and using the ctx.Done()
to stop calling the scanShard
function. I'm not sure if it's actually the better way for this particular case, but it might be worth considering. I'll just include a quick write-up of how I'd write this function using ctx.Done
and select
to control the flow. It's a bit more verbose, and actually looks more complex than it needs to be IMHO, but it's just to show an alternative approach, that in some cases might be worth considering. I have to say, all code in the answer is untested, and just written as I went along, so typo's and bugs are possible:
package main
import (
"context"
"fmt"
"sync"
)
func (c *Consumer) scanShards(ctx context.Context, streamName string, ids string) error {
checkpoints, err := c.collector.GetSequenceNumberForShards(streamName, ids)
if err != nil {
// zerolog
c.logger.Error().Err(err).Msg("Error retrieving stored checkpoints for shards")
return err
}
errc := make(chan error, 1)
defer close(errc)
wg := sync.WaitGroup{}
wg.Add(len(checkpoints))
rctx, cfunc := context.WithCancel(ctx)
for id, seqNum := range checkpoints {
go func(shardID, startSeqNum string) {
defer wg.Done()
select {
case <-rctx.Done():
// context cancelled, no need to do anything
return
default:
if err := c.scanShard(rctx, shardID, startSeqNum); err != nil {
c.logger.Error().Err(err).Msg("Error in shard %q", shardID)
errc <- fmt.Errorf("error in shard %q: %v", shardID, err)
}
}
}(id, seqNum)
}
go func() {
// wg cleared -> cancel context
wg.Wait()
cfunc()
}()
select {
case <-ctx.Done():
// ctx was closed after waitgroup, so nothing to do here
return nil
case err := <-errc:
// error -> cancel routines
cfunc()
return err // return, this will close channel
}
}
add a comment |
Alright, there's a number of issues with the code you posted here. I'll go through it all, and point out issues, step by step:
func (c *Consumer) scanShards(col Collector, streamName string, ids string) error {
Alright, so we have a *Consumer
(exported type), with a non-exported scanShards
function. That's fine. The thing I'm slightly baffled by is why the Collector
is passed as an argument here. In AWS Kinesis terms, a Collector
is what you use to send stuff to Kinesis in a single request. To me, it's sort of like a buffered writer. This collector, however, seems to provide the consumer with data about the shards that you're about to consume. The collector is actually a core dependency of your consumer, it seems. It'd make sense for the consumer to have the underlying collector assigned to a field:
type Consumer struct {
collector Collector // Rename needed, though
cancelFuncs context.CancelFunc // more on this later!
}
Because you're dealing with streams, I'd strongly recommend you use the context package, too (I know you are, but you're not getting the most out of it - see below). Convention dictates that the context argument comes first, so the function would now look like this:
func (c *Consumer) scanShards(ctx context.Context, streamName string, ids string) error {
The arguments could probably do with some renaming, but I'm probably the worst person to suggest a good variable name. Anyway, onwards:
checkpoints, err := col.GetSequenceNumberForShards(streamName, ids)
if err != nil {
return fmt.Errorf("error retrieving stored checkpoints for shards: %v", err)
}
Right, so the "collector" has an exported function getting sequence, and you're expecting the collector to be passed in as an argument. This is kind of weird. It'd make much more sense for someone to use a Consumer
object and call something like consumer.GetShards()
, because that actually tells me something: I know that I'm working with a consumer that is taking its data from a shards X, Y, and Z.
I've looked at what comes next, but I'll be returning to the checkpoints
variable later on. First let me just recommend you look at this package for errors. You're essentially returning a specific error (which is OK), but it's a hard one to detect on the caller-side. The string value of the error still contains all of the raw/initial error value. With the package I linked to, you'd be able to write the same like so:
return errors.Wrap(err, "error retrieving stored checkpoints for shards")
Replace the string constant with an exported constant like so
const ErrRetrievingCheckpoints = "error retrievign stroed checkpoints for shards")
// and use:
return errors.Wrap(err, ErrRetrievingCheckpoints)
And you'll have a much easier time actually detecting what error was returned, without losing the underlying error details.
OK, let's move on to the channel stuff now:
errc := make(chan error, 1)
done := make(chan error, 1)
wg := sync.WaitGroup{}
wg.Add(len(ids))
Right, So you're creating a waitgroup, and add the length of ids to it, so there's going to be 1 routine per ID. Fine. Then you create an error and done channel, both with a buffer of 1. I'll explain how in a bit, but you can (and should) get rid of the done
channel entirely. Having said that, channels like done
are generally defined as done := make(chan struct{})
: no buffer needed, and an empty struct is defined in the golang specs as a 0-byte type.
The error channel, however, has a bit of a problem: Suppose I'm going to start 10 routines, and I've got 3 errors. The first routine to err will write its error value onto the channel, and be done with it. The second and third routines to fail will be blocked, waiting for you to read the error from the channel. You have the code to do this, so you'll get that error, and the second routine will now write a new error value to the channel. The third routine is still blocked!
What your code does next, is cancel all routines (sort of), and wait for the wg.Wait()
call to return. Because the third routine is blocked, this will never happen. You've got a deadlock on your hands. The done
channel won't get closed, and thus this function won't return, all because your error channel is blocking routines from completing. That's not what we want. There's a quick (and not always ideal) solution to this problem: increase the channel buffer:
errCh := make(chan error, len(ids) -1) // we'll always read 1, hence -1 is big enough
Of course, we're assuming ids
is not empty. Just in case, it's always a good idea to add some basic checks like if len(ids) == 0
and return early.
Still, I'll explain later how we fix this without having to increase the channel buffers too much. For now, let's carry on with the line-by-line review:
for _, id := range ids {
seqNum := checkpoints[id]
Yes, I have an issue with this already. checkpoints
clearly is a var of type map[string]string
. I know this because ids
is string
, and the seqNum
variable is passed to a goroutine with the type string
. Why, then, iterate over a slice, and then perform a lookup in a map to get the corresponding value? Why not simply write:
for id, seqNum := range checkpoints {
That's a lot cleaner, and reduces the risk of someone else looking at your code and adding something like this:
seqNum, ok := checkpoints[id]
if !ok {
return errors.New("checkpoint missing")
}
Nevermind the risk of someone changing the GetSequenceNumberForShards
function to not return an error if not all id's were found! Suppose your ids
var holds {"foo", "bar", "zar", "car"}
, but the checkpoints
map only was returned without the zar
key...
For the same reason, you really should replace wg.Add(len(ids))
with wg.Add(len(checkpoints))
Right, onwards, let's get stuck in with the context:
ctx, cancel := context.WithCancel(context.Background())
c.cancelFuncs = append(c.cancelFuncs, cancel)
Why? Why on earth? You're creating a cancel context for each routine, and append the cancel func to a slice without a way to retrieve a specific cancel function. The slice is also stored on the Consumer
type. The function uses a pointer receiver, so there's a genuine risk of race conditions here! Suppose I do this:
go consumer.scanShards(col1, "foo", string{"a", "b", "c"})
go consumer.scanShards(col2, "bar", string{"foo", "bar", "car", "zar"})
Both routines are appending to the same slice of cancel functions. They're both calling all of them blindly, too. These routines are messing things up for eachother. And that's just one example, imagine someone coming along and writing this:
func (c *Consumer) Reset() {
// restoring some fields
c.cancelFuncs = context.CancelFunc{}
// more fields here
}
These kind of things can be really hard to debug, and your code is very vulnerable to these issues. You may think that the Reset
func I've written here is an unlikely scenario, but given that there's nothing in your code that actually removes the cancel
values from the slice, your slice will grow, and grow, and grow, and things will get slowed down.
OK, I had to rant about these things somewhat, but remember how I said that you ought to have your first argument be a context? Right, do it, it allows the caller to pass in a context.WithCancel
, or context.WithTimeout
, for example, so the caller can determine whether they want to/need to wait on your function to return. Secondly: why not have all your routines share the same context?. You're just re-wrapping the Background
context anyway. Instead of doing that, I'd simply wrap the context from the argument once (outside the loop):
rctx, cfunc := context.WithCancel(ctx)
If the context passed as an argument gets cancelled, the cancellation will propagate, if you want to cancel the context, call cfunc
, and all your routines will receive the cancellation signal (<-ctx.Done()
). The caller is unaffected.
So with this in mind, let's rewrite the loop a bit (more improvements to follow below, but what we have thusfar):
func (c *Consumer) scanShards(ctx context.Context, stream string, ids string) error {
if len(ids) == 0 {
return ErrNoIdsProvided
}
checkpoints, err := c.collector.GetSequenceNumberForShards(stream, ids)
if err != nil {
return errors.Wrap(err, ErrRetrievingCheckpoints)
}
if len(checkpoints) == 0 {
return ErrNothingToCheck // something like this, should be handled properly
}
// note ids is irrelevant, checkpoints is the source of truth now
errCh := make(chan error, len(checkpoints) - 1)
// we'll get rid of this, but I've not explained how, so it's here still:
done := make(chan struct{})
wg := sync.WaitGroup{}
wg.Add(len(checkpoints))
// wrap around ctx argument once!
rctx, cfunc := context.WithCancel(ctx)
for id, seqNum := range checkpoints {
go func(ctx context.Context, shardID, startSeqNum string) {
defer wg.Done()
if err := c.scanShard(ctx, shardID, startSeqNum); err != nil {
errc <- fmt.Errorf("error in shard %q: %v", shardID, err)
}
}(rctx, id, seqNum)
}
OK, that's where we are now. Because the rctx
is not in the for
scope, and is shared by all routines, you can simply write the routine like so:
go func(id, seqNum string) {
defer wg.Done()
// note: rctx, not ctx!
if err := c.scanShard(rctx, id, seqNum); err != nil {
errCh <- err
}
}(id, seqNum)
The routine centers around c.scanShard
, a function you've not posted, and thus I have no idea what it does (how it uses the context etc...). What is clear to me, though, is that it's an unexported function, and you have control over it. You could just as easily make it into a function that behaves exactly like the routine you wrapped the call in, so you can replace the go func()
bit simply with this:
go c.scanShard(rctx, &wg, id, seqNum, errCh)
// given scanShard looks like this:
func (c *Consumer) scanShard(ctx context.Context, wg *sync.WaitGroup, id, seqNum string, errCh chan<- error)
Anyway, the rest of your code is about tying things together:
go func() {
wg.Wait()
close(errc)
close(done)
}()
So waiting for the routines to return before closing the channels, which is fair enough, seeing as writing to a closed channel is bad. Closing the done
channel here, though is what actually allows your function to return. But does it add any value?
err = <-errc
if err != nil {
for _, cancel := range c.cancelFuncs {
cancel()
}
}
<-done // Wait for all goroutines to exit.
return err
}
And this is where things get realy messy: you're first checking the error channel (fair enough), but because you're not sure whether or not you've passed the blocking read (err = <-errCh
) because the channel was closed or not, you have to check whether or not you actually have an error. If not, you're still checking the done
channel, completely pointlessly. Even with your code as it stands, the done
channel is completely redundant, the error channel already does the same thing.
Get rid of done
, even with your current code, it serves no purpose whatsoever.
Alternative approach
I've been ranting plenty for now, and I think this is a decent starting point. When I started looking at your code, my mind did jump to select
statements, and using the ctx.Done()
to stop calling the scanShard
function. I'm not sure if it's actually the better way for this particular case, but it might be worth considering. I'll just include a quick write-up of how I'd write this function using ctx.Done
and select
to control the flow. It's a bit more verbose, and actually looks more complex than it needs to be IMHO, but it's just to show an alternative approach, that in some cases might be worth considering. I have to say, all code in the answer is untested, and just written as I went along, so typo's and bugs are possible:
package main
import (
"context"
"fmt"
"sync"
)
func (c *Consumer) scanShards(ctx context.Context, streamName string, ids string) error {
checkpoints, err := c.collector.GetSequenceNumberForShards(streamName, ids)
if err != nil {
// zerolog
c.logger.Error().Err(err).Msg("Error retrieving stored checkpoints for shards")
return err
}
errc := make(chan error, 1)
defer close(errc)
wg := sync.WaitGroup{}
wg.Add(len(checkpoints))
rctx, cfunc := context.WithCancel(ctx)
for id, seqNum := range checkpoints {
go func(shardID, startSeqNum string) {
defer wg.Done()
select {
case <-rctx.Done():
// context cancelled, no need to do anything
return
default:
if err := c.scanShard(rctx, shardID, startSeqNum); err != nil {
c.logger.Error().Err(err).Msg("Error in shard %q", shardID)
errc <- fmt.Errorf("error in shard %q: %v", shardID, err)
}
}
}(id, seqNum)
}
go func() {
// wg cleared -> cancel context
wg.Wait()
cfunc()
}()
select {
case <-ctx.Done():
// ctx was closed after waitgroup, so nothing to do here
return nil
case err := <-errc:
// error -> cancel routines
cfunc()
return err // return, this will close channel
}
}
add a comment |
Alright, there's a number of issues with the code you posted here. I'll go through it all, and point out issues, step by step:
func (c *Consumer) scanShards(col Collector, streamName string, ids string) error {
Alright, so we have a *Consumer
(exported type), with a non-exported scanShards
function. That's fine. The thing I'm slightly baffled by is why the Collector
is passed as an argument here. In AWS Kinesis terms, a Collector
is what you use to send stuff to Kinesis in a single request. To me, it's sort of like a buffered writer. This collector, however, seems to provide the consumer with data about the shards that you're about to consume. The collector is actually a core dependency of your consumer, it seems. It'd make sense for the consumer to have the underlying collector assigned to a field:
type Consumer struct {
collector Collector // Rename needed, though
cancelFuncs context.CancelFunc // more on this later!
}
Because you're dealing with streams, I'd strongly recommend you use the context package, too (I know you are, but you're not getting the most out of it - see below). Convention dictates that the context argument comes first, so the function would now look like this:
func (c *Consumer) scanShards(ctx context.Context, streamName string, ids string) error {
The arguments could probably do with some renaming, but I'm probably the worst person to suggest a good variable name. Anyway, onwards:
checkpoints, err := col.GetSequenceNumberForShards(streamName, ids)
if err != nil {
return fmt.Errorf("error retrieving stored checkpoints for shards: %v", err)
}
Right, so the "collector" has an exported function getting sequence, and you're expecting the collector to be passed in as an argument. This is kind of weird. It'd make much more sense for someone to use a Consumer
object and call something like consumer.GetShards()
, because that actually tells me something: I know that I'm working with a consumer that is taking its data from a shards X, Y, and Z.
I've looked at what comes next, but I'll be returning to the checkpoints
variable later on. First let me just recommend you look at this package for errors. You're essentially returning a specific error (which is OK), but it's a hard one to detect on the caller-side. The string value of the error still contains all of the raw/initial error value. With the package I linked to, you'd be able to write the same like so:
return errors.Wrap(err, "error retrieving stored checkpoints for shards")
Replace the string constant with an exported constant like so
const ErrRetrievingCheckpoints = "error retrievign stroed checkpoints for shards")
// and use:
return errors.Wrap(err, ErrRetrievingCheckpoints)
And you'll have a much easier time actually detecting what error was returned, without losing the underlying error details.
OK, let's move on to the channel stuff now:
errc := make(chan error, 1)
done := make(chan error, 1)
wg := sync.WaitGroup{}
wg.Add(len(ids))
Right, So you're creating a waitgroup, and add the length of ids to it, so there's going to be 1 routine per ID. Fine. Then you create an error and done channel, both with a buffer of 1. I'll explain how in a bit, but you can (and should) get rid of the done
channel entirely. Having said that, channels like done
are generally defined as done := make(chan struct{})
: no buffer needed, and an empty struct is defined in the golang specs as a 0-byte type.
The error channel, however, has a bit of a problem: Suppose I'm going to start 10 routines, and I've got 3 errors. The first routine to err will write its error value onto the channel, and be done with it. The second and third routines to fail will be blocked, waiting for you to read the error from the channel. You have the code to do this, so you'll get that error, and the second routine will now write a new error value to the channel. The third routine is still blocked!
What your code does next, is cancel all routines (sort of), and wait for the wg.Wait()
call to return. Because the third routine is blocked, this will never happen. You've got a deadlock on your hands. The done
channel won't get closed, and thus this function won't return, all because your error channel is blocking routines from completing. That's not what we want. There's a quick (and not always ideal) solution to this problem: increase the channel buffer:
errCh := make(chan error, len(ids) -1) // we'll always read 1, hence -1 is big enough
Of course, we're assuming ids
is not empty. Just in case, it's always a good idea to add some basic checks like if len(ids) == 0
and return early.
Still, I'll explain later how we fix this without having to increase the channel buffers too much. For now, let's carry on with the line-by-line review:
for _, id := range ids {
seqNum := checkpoints[id]
Yes, I have an issue with this already. checkpoints
clearly is a var of type map[string]string
. I know this because ids
is string
, and the seqNum
variable is passed to a goroutine with the type string
. Why, then, iterate over a slice, and then perform a lookup in a map to get the corresponding value? Why not simply write:
for id, seqNum := range checkpoints {
That's a lot cleaner, and reduces the risk of someone else looking at your code and adding something like this:
seqNum, ok := checkpoints[id]
if !ok {
return errors.New("checkpoint missing")
}
Nevermind the risk of someone changing the GetSequenceNumberForShards
function to not return an error if not all id's were found! Suppose your ids
var holds {"foo", "bar", "zar", "car"}
, but the checkpoints
map only was returned without the zar
key...
For the same reason, you really should replace wg.Add(len(ids))
with wg.Add(len(checkpoints))
Right, onwards, let's get stuck in with the context:
ctx, cancel := context.WithCancel(context.Background())
c.cancelFuncs = append(c.cancelFuncs, cancel)
Why? Why on earth? You're creating a cancel context for each routine, and append the cancel func to a slice without a way to retrieve a specific cancel function. The slice is also stored on the Consumer
type. The function uses a pointer receiver, so there's a genuine risk of race conditions here! Suppose I do this:
go consumer.scanShards(col1, "foo", string{"a", "b", "c"})
go consumer.scanShards(col2, "bar", string{"foo", "bar", "car", "zar"})
Both routines are appending to the same slice of cancel functions. They're both calling all of them blindly, too. These routines are messing things up for eachother. And that's just one example, imagine someone coming along and writing this:
func (c *Consumer) Reset() {
// restoring some fields
c.cancelFuncs = context.CancelFunc{}
// more fields here
}
These kind of things can be really hard to debug, and your code is very vulnerable to these issues. You may think that the Reset
func I've written here is an unlikely scenario, but given that there's nothing in your code that actually removes the cancel
values from the slice, your slice will grow, and grow, and grow, and things will get slowed down.
OK, I had to rant about these things somewhat, but remember how I said that you ought to have your first argument be a context? Right, do it, it allows the caller to pass in a context.WithCancel
, or context.WithTimeout
, for example, so the caller can determine whether they want to/need to wait on your function to return. Secondly: why not have all your routines share the same context?. You're just re-wrapping the Background
context anyway. Instead of doing that, I'd simply wrap the context from the argument once (outside the loop):
rctx, cfunc := context.WithCancel(ctx)
If the context passed as an argument gets cancelled, the cancellation will propagate, if you want to cancel the context, call cfunc
, and all your routines will receive the cancellation signal (<-ctx.Done()
). The caller is unaffected.
So with this in mind, let's rewrite the loop a bit (more improvements to follow below, but what we have thusfar):
func (c *Consumer) scanShards(ctx context.Context, stream string, ids string) error {
if len(ids) == 0 {
return ErrNoIdsProvided
}
checkpoints, err := c.collector.GetSequenceNumberForShards(stream, ids)
if err != nil {
return errors.Wrap(err, ErrRetrievingCheckpoints)
}
if len(checkpoints) == 0 {
return ErrNothingToCheck // something like this, should be handled properly
}
// note ids is irrelevant, checkpoints is the source of truth now
errCh := make(chan error, len(checkpoints) - 1)
// we'll get rid of this, but I've not explained how, so it's here still:
done := make(chan struct{})
wg := sync.WaitGroup{}
wg.Add(len(checkpoints))
// wrap around ctx argument once!
rctx, cfunc := context.WithCancel(ctx)
for id, seqNum := range checkpoints {
go func(ctx context.Context, shardID, startSeqNum string) {
defer wg.Done()
if err := c.scanShard(ctx, shardID, startSeqNum); err != nil {
errc <- fmt.Errorf("error in shard %q: %v", shardID, err)
}
}(rctx, id, seqNum)
}
OK, that's where we are now. Because the rctx
is not in the for
scope, and is shared by all routines, you can simply write the routine like so:
go func(id, seqNum string) {
defer wg.Done()
// note: rctx, not ctx!
if err := c.scanShard(rctx, id, seqNum); err != nil {
errCh <- err
}
}(id, seqNum)
The routine centers around c.scanShard
, a function you've not posted, and thus I have no idea what it does (how it uses the context etc...). What is clear to me, though, is that it's an unexported function, and you have control over it. You could just as easily make it into a function that behaves exactly like the routine you wrapped the call in, so you can replace the go func()
bit simply with this:
go c.scanShard(rctx, &wg, id, seqNum, errCh)
// given scanShard looks like this:
func (c *Consumer) scanShard(ctx context.Context, wg *sync.WaitGroup, id, seqNum string, errCh chan<- error)
Anyway, the rest of your code is about tying things together:
go func() {
wg.Wait()
close(errc)
close(done)
}()
So waiting for the routines to return before closing the channels, which is fair enough, seeing as writing to a closed channel is bad. Closing the done
channel here, though is what actually allows your function to return. But does it add any value?
err = <-errc
if err != nil {
for _, cancel := range c.cancelFuncs {
cancel()
}
}
<-done // Wait for all goroutines to exit.
return err
}
And this is where things get realy messy: you're first checking the error channel (fair enough), but because you're not sure whether or not you've passed the blocking read (err = <-errCh
) because the channel was closed or not, you have to check whether or not you actually have an error. If not, you're still checking the done
channel, completely pointlessly. Even with your code as it stands, the done
channel is completely redundant, the error channel already does the same thing.
Get rid of done
, even with your current code, it serves no purpose whatsoever.
Alternative approach
I've been ranting plenty for now, and I think this is a decent starting point. When I started looking at your code, my mind did jump to select
statements, and using the ctx.Done()
to stop calling the scanShard
function. I'm not sure if it's actually the better way for this particular case, but it might be worth considering. I'll just include a quick write-up of how I'd write this function using ctx.Done
and select
to control the flow. It's a bit more verbose, and actually looks more complex than it needs to be IMHO, but it's just to show an alternative approach, that in some cases might be worth considering. I have to say, all code in the answer is untested, and just written as I went along, so typo's and bugs are possible:
package main
import (
"context"
"fmt"
"sync"
)
func (c *Consumer) scanShards(ctx context.Context, streamName string, ids string) error {
checkpoints, err := c.collector.GetSequenceNumberForShards(streamName, ids)
if err != nil {
// zerolog
c.logger.Error().Err(err).Msg("Error retrieving stored checkpoints for shards")
return err
}
errc := make(chan error, 1)
defer close(errc)
wg := sync.WaitGroup{}
wg.Add(len(checkpoints))
rctx, cfunc := context.WithCancel(ctx)
for id, seqNum := range checkpoints {
go func(shardID, startSeqNum string) {
defer wg.Done()
select {
case <-rctx.Done():
// context cancelled, no need to do anything
return
default:
if err := c.scanShard(rctx, shardID, startSeqNum); err != nil {
c.logger.Error().Err(err).Msg("Error in shard %q", shardID)
errc <- fmt.Errorf("error in shard %q: %v", shardID, err)
}
}
}(id, seqNum)
}
go func() {
// wg cleared -> cancel context
wg.Wait()
cfunc()
}()
select {
case <-ctx.Done():
// ctx was closed after waitgroup, so nothing to do here
return nil
case err := <-errc:
// error -> cancel routines
cfunc()
return err // return, this will close channel
}
}
Alright, there's a number of issues with the code you posted here. I'll go through it all, and point out issues, step by step:
func (c *Consumer) scanShards(col Collector, streamName string, ids string) error {
Alright, so we have a *Consumer
(exported type), with a non-exported scanShards
function. That's fine. The thing I'm slightly baffled by is why the Collector
is passed as an argument here. In AWS Kinesis terms, a Collector
is what you use to send stuff to Kinesis in a single request. To me, it's sort of like a buffered writer. This collector, however, seems to provide the consumer with data about the shards that you're about to consume. The collector is actually a core dependency of your consumer, it seems. It'd make sense for the consumer to have the underlying collector assigned to a field:
type Consumer struct {
collector Collector // Rename needed, though
cancelFuncs context.CancelFunc // more on this later!
}
Because you're dealing with streams, I'd strongly recommend you use the context package, too (I know you are, but you're not getting the most out of it - see below). Convention dictates that the context argument comes first, so the function would now look like this:
func (c *Consumer) scanShards(ctx context.Context, streamName string, ids string) error {
The arguments could probably do with some renaming, but I'm probably the worst person to suggest a good variable name. Anyway, onwards:
checkpoints, err := col.GetSequenceNumberForShards(streamName, ids)
if err != nil {
return fmt.Errorf("error retrieving stored checkpoints for shards: %v", err)
}
Right, so the "collector" has an exported function getting sequence, and you're expecting the collector to be passed in as an argument. This is kind of weird. It'd make much more sense for someone to use a Consumer
object and call something like consumer.GetShards()
, because that actually tells me something: I know that I'm working with a consumer that is taking its data from a shards X, Y, and Z.
I've looked at what comes next, but I'll be returning to the checkpoints
variable later on. First let me just recommend you look at this package for errors. You're essentially returning a specific error (which is OK), but it's a hard one to detect on the caller-side. The string value of the error still contains all of the raw/initial error value. With the package I linked to, you'd be able to write the same like so:
return errors.Wrap(err, "error retrieving stored checkpoints for shards")
Replace the string constant with an exported constant like so
const ErrRetrievingCheckpoints = "error retrievign stroed checkpoints for shards")
// and use:
return errors.Wrap(err, ErrRetrievingCheckpoints)
And you'll have a much easier time actually detecting what error was returned, without losing the underlying error details.
OK, let's move on to the channel stuff now:
errc := make(chan error, 1)
done := make(chan error, 1)
wg := sync.WaitGroup{}
wg.Add(len(ids))
Right, So you're creating a waitgroup, and add the length of ids to it, so there's going to be 1 routine per ID. Fine. Then you create an error and done channel, both with a buffer of 1. I'll explain how in a bit, but you can (and should) get rid of the done
channel entirely. Having said that, channels like done
are generally defined as done := make(chan struct{})
: no buffer needed, and an empty struct is defined in the golang specs as a 0-byte type.
The error channel, however, has a bit of a problem: Suppose I'm going to start 10 routines, and I've got 3 errors. The first routine to err will write its error value onto the channel, and be done with it. The second and third routines to fail will be blocked, waiting for you to read the error from the channel. You have the code to do this, so you'll get that error, and the second routine will now write a new error value to the channel. The third routine is still blocked!
What your code does next, is cancel all routines (sort of), and wait for the wg.Wait()
call to return. Because the third routine is blocked, this will never happen. You've got a deadlock on your hands. The done
channel won't get closed, and thus this function won't return, all because your error channel is blocking routines from completing. That's not what we want. There's a quick (and not always ideal) solution to this problem: increase the channel buffer:
errCh := make(chan error, len(ids) -1) // we'll always read 1, hence -1 is big enough
Of course, we're assuming ids
is not empty. Just in case, it's always a good idea to add some basic checks like if len(ids) == 0
and return early.
Still, I'll explain later how we fix this without having to increase the channel buffers too much. For now, let's carry on with the line-by-line review:
for _, id := range ids {
seqNum := checkpoints[id]
Yes, I have an issue with this already. checkpoints
clearly is a var of type map[string]string
. I know this because ids
is string
, and the seqNum
variable is passed to a goroutine with the type string
. Why, then, iterate over a slice, and then perform a lookup in a map to get the corresponding value? Why not simply write:
for id, seqNum := range checkpoints {
That's a lot cleaner, and reduces the risk of someone else looking at your code and adding something like this:
seqNum, ok := checkpoints[id]
if !ok {
return errors.New("checkpoint missing")
}
Nevermind the risk of someone changing the GetSequenceNumberForShards
function to not return an error if not all id's were found! Suppose your ids
var holds {"foo", "bar", "zar", "car"}
, but the checkpoints
map only was returned without the zar
key...
For the same reason, you really should replace wg.Add(len(ids))
with wg.Add(len(checkpoints))
Right, onwards, let's get stuck in with the context:
ctx, cancel := context.WithCancel(context.Background())
c.cancelFuncs = append(c.cancelFuncs, cancel)
Why? Why on earth? You're creating a cancel context for each routine, and append the cancel func to a slice without a way to retrieve a specific cancel function. The slice is also stored on the Consumer
type. The function uses a pointer receiver, so there's a genuine risk of race conditions here! Suppose I do this:
go consumer.scanShards(col1, "foo", string{"a", "b", "c"})
go consumer.scanShards(col2, "bar", string{"foo", "bar", "car", "zar"})
Both routines are appending to the same slice of cancel functions. They're both calling all of them blindly, too. These routines are messing things up for eachother. And that's just one example, imagine someone coming along and writing this:
func (c *Consumer) Reset() {
// restoring some fields
c.cancelFuncs = context.CancelFunc{}
// more fields here
}
These kind of things can be really hard to debug, and your code is very vulnerable to these issues. You may think that the Reset
func I've written here is an unlikely scenario, but given that there's nothing in your code that actually removes the cancel
values from the slice, your slice will grow, and grow, and grow, and things will get slowed down.
OK, I had to rant about these things somewhat, but remember how I said that you ought to have your first argument be a context? Right, do it, it allows the caller to pass in a context.WithCancel
, or context.WithTimeout
, for example, so the caller can determine whether they want to/need to wait on your function to return. Secondly: why not have all your routines share the same context?. You're just re-wrapping the Background
context anyway. Instead of doing that, I'd simply wrap the context from the argument once (outside the loop):
rctx, cfunc := context.WithCancel(ctx)
If the context passed as an argument gets cancelled, the cancellation will propagate, if you want to cancel the context, call cfunc
, and all your routines will receive the cancellation signal (<-ctx.Done()
). The caller is unaffected.
So with this in mind, let's rewrite the loop a bit (more improvements to follow below, but what we have thusfar):
func (c *Consumer) scanShards(ctx context.Context, stream string, ids string) error {
if len(ids) == 0 {
return ErrNoIdsProvided
}
checkpoints, err := c.collector.GetSequenceNumberForShards(stream, ids)
if err != nil {
return errors.Wrap(err, ErrRetrievingCheckpoints)
}
if len(checkpoints) == 0 {
return ErrNothingToCheck // something like this, should be handled properly
}
// note ids is irrelevant, checkpoints is the source of truth now
errCh := make(chan error, len(checkpoints) - 1)
// we'll get rid of this, but I've not explained how, so it's here still:
done := make(chan struct{})
wg := sync.WaitGroup{}
wg.Add(len(checkpoints))
// wrap around ctx argument once!
rctx, cfunc := context.WithCancel(ctx)
for id, seqNum := range checkpoints {
go func(ctx context.Context, shardID, startSeqNum string) {
defer wg.Done()
if err := c.scanShard(ctx, shardID, startSeqNum); err != nil {
errc <- fmt.Errorf("error in shard %q: %v", shardID, err)
}
}(rctx, id, seqNum)
}
OK, that's where we are now. Because the rctx
is not in the for
scope, and is shared by all routines, you can simply write the routine like so:
go func(id, seqNum string) {
defer wg.Done()
// note: rctx, not ctx!
if err := c.scanShard(rctx, id, seqNum); err != nil {
errCh <- err
}
}(id, seqNum)
The routine centers around c.scanShard
, a function you've not posted, and thus I have no idea what it does (how it uses the context etc...). What is clear to me, though, is that it's an unexported function, and you have control over it. You could just as easily make it into a function that behaves exactly like the routine you wrapped the call in, so you can replace the go func()
bit simply with this:
go c.scanShard(rctx, &wg, id, seqNum, errCh)
// given scanShard looks like this:
func (c *Consumer) scanShard(ctx context.Context, wg *sync.WaitGroup, id, seqNum string, errCh chan<- error)
Anyway, the rest of your code is about tying things together:
go func() {
wg.Wait()
close(errc)
close(done)
}()
So waiting for the routines to return before closing the channels, which is fair enough, seeing as writing to a closed channel is bad. Closing the done
channel here, though is what actually allows your function to return. But does it add any value?
err = <-errc
if err != nil {
for _, cancel := range c.cancelFuncs {
cancel()
}
}
<-done // Wait for all goroutines to exit.
return err
}
And this is where things get realy messy: you're first checking the error channel (fair enough), but because you're not sure whether or not you've passed the blocking read (err = <-errCh
) because the channel was closed or not, you have to check whether or not you actually have an error. If not, you're still checking the done
channel, completely pointlessly. Even with your code as it stands, the done
channel is completely redundant, the error channel already does the same thing.
Get rid of done
, even with your current code, it serves no purpose whatsoever.
Alternative approach
I've been ranting plenty for now, and I think this is a decent starting point. When I started looking at your code, my mind did jump to select
statements, and using the ctx.Done()
to stop calling the scanShard
function. I'm not sure if it's actually the better way for this particular case, but it might be worth considering. I'll just include a quick write-up of how I'd write this function using ctx.Done
and select
to control the flow. It's a bit more verbose, and actually looks more complex than it needs to be IMHO, but it's just to show an alternative approach, that in some cases might be worth considering. I have to say, all code in the answer is untested, and just written as I went along, so typo's and bugs are possible:
package main
import (
"context"
"fmt"
"sync"
)
func (c *Consumer) scanShards(ctx context.Context, streamName string, ids string) error {
checkpoints, err := c.collector.GetSequenceNumberForShards(streamName, ids)
if err != nil {
// zerolog
c.logger.Error().Err(err).Msg("Error retrieving stored checkpoints for shards")
return err
}
errc := make(chan error, 1)
defer close(errc)
wg := sync.WaitGroup{}
wg.Add(len(checkpoints))
rctx, cfunc := context.WithCancel(ctx)
for id, seqNum := range checkpoints {
go func(shardID, startSeqNum string) {
defer wg.Done()
select {
case <-rctx.Done():
// context cancelled, no need to do anything
return
default:
if err := c.scanShard(rctx, shardID, startSeqNum); err != nil {
c.logger.Error().Err(err).Msg("Error in shard %q", shardID)
errc <- fmt.Errorf("error in shard %q: %v", shardID, err)
}
}
}(id, seqNum)
}
go func() {
// wg cleared -> cancel context
wg.Wait()
cfunc()
}()
select {
case <-ctx.Done():
// ctx was closed after waitgroup, so nothing to do here
return nil
case err := <-errc:
// error -> cancel routines
cfunc()
return err // return, this will close channel
}
}
answered yesterday
Elias Van OotegemElias Van Ootegem
9,0232144
9,0232144
add a comment |
add a comment |
Thanks for contributing an answer to Code Review Stack Exchange!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
Use MathJax to format equations. MathJax reference.
To learn more, see our tips on writing great answers.
Some of your past answers have not been well-received, and you're in danger of being blocked from answering.
Please pay close attention to the following guidance:
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f210698%2freading-shards-from-a-kinesis-stream%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown