Parallel railway tracks
Parallel railway tracks (source: music4life via Pixabay).

One of Go’s flagship features is its powerful concurrency primitives, like channels and goroutines. But often goroutines are a foreign concept to newcomers to Go, so it’s not uncommon to see frustration as new learners try to master the concepts of concurrency.

The first tool that the Go team released to help with the complexity of managing goroutines was sync.WaitGroup, which allowed you to create a WaitGroup that would block until a specified number of goroutines had finished executing. Here’s an example from the documentation:

   var wg sync.WaitGroup
    var urls = []string{
            "http://www.golang.org/",
            "http://www.google.com/",
            "http://www.somestupidname.com/",
    }
    for _, url := range urls {
            // Increment the WaitGroup counter.
            wg.Add(1)
            // Launch a goroutine to fetch the URL.
            go func(url string) {
                    // Decrement the counter when the goroutine completes.
                    defer wg.Done()
                    // Fetch the URL.
                    http.Get(url)
            }(url)
    }
    // Wait for all HTTP fetches to complete.
    wg.Wait()

WaitGroups made it significantly easier to deal with concurrency in Go because they reduced the amount of accounting you had to do when launching goroutines. Every time you launch a goroutine you increment the WaitGroup by calling Add(). When one finishes, you call wg.Done(). To wait for all of them to complete, you call wg.Wait() which blocks until they’ve all finished. The only issue is that if a problem happened in one of your goroutines it was difficult to figure out what the error was.

Extending sync.WaitGroup’s functionality

Recently the Go team added a new package in the experimental repository called sync.ErrGroup. sync.ErrGroup extends sync.WaitGroup by adding error propagation and the ability to cancel an entire set of goroutines when an unrecoverable error occurs, or a timeout is reached. Here’s the same example rewritten to use an ErrGroup:

var g errgroup.Group
var urls = []string{
    "http://www.golang.org/",
    "http://www.google.com/",
    "http://www.somestupidname.com/",
}
for _, url := range urls {
    // Launch a goroutine to fetch the URL.
    url := url // https://golang.org/doc/faq#closures_and_goroutines
    g.Go(func() error {
        // Fetch the URL.
        resp, err := http.Get(url)
        if err == nil {
            resp.Body.Close()
        }
        return err
    })
}
// Wait for all HTTP fetches to complete.
if err := g.Wait(); err == nil {
    fmt.Println("Successfully fetched all URLs.")
}

The g.Go() function above is a wrapper that allows you to launch an anonymous function but still capture the errors that it may return without all the verbose plumbing that would otherwise be required. It’s a significant improvement in developer productivity when using goroutines.

To test all of the functionality of sync.ErrGroup, I’ve written a small program that searches a directory recursively for Go files with a specified pattern. This might be useful to find instances in your Go source tree where you’ve used a package that has been deprecated or updated. To test all of the features of sync.ErrGroup, I also added a time limit feature to the application. If the time limit is reached, all of the searching and processing goroutines will be cancelled and the program will exit.

When applied against the directory for my sample application it produces these results:

$ gogrep -timeout 1000ms . fmt                                                                                                 
gogrep.go
1 hits

If you call it without the right number of parameters it prints the correct usage:

gogrep by Brian Ketelsen
Flags:
  -timeout duration
    	timeout in milliseconds (default 500ms)
Usage:
	gogrep [flags] path pattern

How sync.ErrGroup makes application building easier

Let’s take a look at the code and see how sync.ErrGroup makes this application so easy to build. We’ll start with main() because I like to read code like a story, and every code story starts with main().

package main

import (
	"bytes"
	"flag"
	"fmt"
	"io/ioutil"
	"log"
	"os"
	"path/filepath"
	"strings"
	"time"

	"golang.org/x/net/context"
	"golang.org/x/sync/errgroup"
)

func main() {
	duration := flag.Duration("timeout", 500*time.Millisecond, "timeout in milliseconds")
	flag.Usage = func() {
		fmt.Printf("%s by Brian Ketelsen\n", os.Args[0])
		fmt.Println("Usage:")
		fmt.Printf("	gogrep [flags] path pattern \n")
		fmt.Println("Flags:")
		flag.PrintDefaults()

	}
	flag.Parse()
	if flag.NArg() != 2 {
		flag.Usage()
		os.Exit(-1)
	}
	path := flag.Arg(0)
	pattern := flag.Arg(1)
	ctx, _ := context.WithTimeout(context.Background(), *duration)
	m, err := search(ctx, path, pattern)
	if err != nil {
		log.Fatal(err)
	}
	for _, name := range m {
		fmt.Println(name)
	}
	fmt.Println(len(m), "hits")
}

The first 15 lines set up the flags and arguments that are expected and print a nice default error message when the program is called without the right number of arguments. The first line of interest is line 16:

	ctx, _ := context.WithTimeout(context.Background(), *duration)

Here, I’ve created a new context.Context with a timeout attached to it. The timeout duration is set to the duration flag variable. When the timeout is reached, “ctx” and all contexts that inherit from it will receive a message on a channel alerting them to the timeout. WithTimeout also returns a cancel function which we won’t need, so I’ve discarded it by assigning it to “_”.

The next line calls the search() function passing in the context, search path, and search patterns. Finally the results are printed to the terminal followed by a count of search hits.

Breaking down the search() function

The search() function is a little longer than main() so I’ll break it up as I explain what’s happening.

The first thing that happens in the search function is the creation of a new errgroup. This structure contains the context and does all the process accounting for the concurrency that will follow.

func search(ctx context.Context, root string, pattern string) ([]string, error) {
	g, ctx := errgroup.WithContext(ctx)

Next, I created a channel to keep track of all of the files that need to be searched. Later we’ll send search candidates to this channel for further processing to determine if they’re a match to the supplied pattern. This channel has a buffer of 100 so the processing goroutines can get started before the file search goroutines finish.

	paths := make(chan string, 100)

The errgroup type has two methods: Wait() and Go(). Go() launches tasks and Wait() blocks until they’ve all completed. Here, we call Go() with an anonymous function that returns an error.

	g.Go(func() error {

Next, we defer closing the “paths” channel to signal that all of the directory searching has completed. This allows us to use Go’s “range” statement later to process all the candidate files in more goroutines.

		defer close(paths)

Finally we use the filepath package’s Walk() function to recursively look through all the files in the directory specified in the command line arguments. It checks to make sure that the file is a readable file, then adds a fast-path exit for any files that don’t have the “.go” suffix. There’s no point in searching for Go source code in files that aren’t Go source code.

	return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
			if err != nil {
				return err
			}
			if !info.Mode().IsRegular() {
				return nil
			}
			if !info.IsDir() && !strings.HasSuffix(info.Name(), ".go") {
				return nil
			}

Spotting the real power of sync.Errgroup

Each of the above conditions will abandon processing for the current file because it isn’t a candidate for our search. Anything that makes it past this point is a Go source file that we want to examine. Here’s where the real power of the sync.Errgroup starts to show. I use a select statement with two possible cases. The first case sends the name of the file to the “paths” channel where another goroutine will search its contents. The second case waits for the context’s timeout to occur. As long as there is still time left on the clock, the current file will be sent for processing. When the timer expires, the context’s Done channel will send a message that is caught causing the goroutine to return, which stops the file searching routine.

		select {
			case paths <- path:
			case <-ctx.Done():
				return ctx.Err()
			}
			return nil
		})

	})

Next I created a channel to handle all the files that matched the search pattern.

	c := make(chan string,100)

Now we can iterate over the files in paths channel and search their contents.

	for path := range paths {

One unique thing here that’s worth pointing out: because the goroutine is a closure it captures the values of surrounding variables while it’s executing. Because we’re going to have multiple goroutines, we need to capture the current value of the path variable inside the loop, otherwise all goroutines would be operating on a value of the path variable that may have changed on the next iteration of the for loop. That would be a nasty race condition.

		p := path

Now we’ll fire off another anonymous function for every candidate file. This function reads the contents of the Go source file and checks to see if it contains the supplied search pattern.

		g.Go(func() error {
			data, err := ioutil.ReadFile(p)
			if err != nil {
				return err
			}
			if !bytes.Contains(data, []byte(pattern)) {
				return nil
			}

Once again we’ll use a select statement to make watch for the timeout firing before our processing has completed.

			select {
			case c <- p:
			case <-ctx.Done():
				return ctx.Err()
			}
			return nil
		})
	}

This function will wait for all of the errgroup’s goroutines to complete then close the results channel, signalling that all processing is complete and terminating the range statement above.

	go func() {
		g.Wait()
		close(c)
	}()

Now we collect the results from the channel and put them in a slice to return back to main().

	var m []string
	for r := range c {
		m = append(m, r)
	}

Finally we’ll wrap up by checking for errors in the errgroup. If any of the goroutines above returned an error, we’ll return it back to main() with an empty resultset.

	return m, g.Wait()
}

This simple application is far from optimal. For example, I read the entire contents of the source code file into memory before testing the contents for a pattern match. A streaming reader would be much more efficient. The arbitrary channel buffer size of 100 means that up to 100 different goroutines can be reading files into memory. If you have a huge directory full of huge Go source files, it might make a noticeable dent in your memory consumption. I was able to search my entire Go source tree for occurrences of the “fmt” package in just a few seconds without any spikes in CPU or memory usage, so I’ve decided to leave it unoptimized.

The full source code is available on Github here.

If you have Go installed, you can also run “go get github.com/bketelsen/gogrep” from your command line to download the application.

I hope you’re as excited as I am about how sync.Errgroup makes concurrency in Go so much easier. If you’re not using it yet, now is the time to start—you’ll be glad you did. Concepts like these are a core part of my upcoming O’Reilly Go Beyond the Basics in-person training in Boston, October 3 & 4, and also my online training October 25 & 26.

Article image: Parallel railway tracks (source: music4life via Pixabay).