Radio Telescope
Radio Telescope (source: Didgeman)

The first two parts of this series created a general-purpose client that can subscribe to channels on a NATS server, send messages, and wait for responses. But one of the most common communication models is request/response, where two clients engage in one-to-one bidirectional communication. NATS is a pure PubSub system, meaning that everything is built on top of publish and subscribe operations. The NATS Go client supports the Request/Response model of communication by building it on top of the PubSub methods we have already developed.

Because making a request involves awaiting for a response, a Go package that is gaining increasing adoption is context, which was designed for request APIs by providing support for deadlines, cancellation signals, and request-scoped data. Cancellation propagation is an important topic in Go, because it allows us quickly reclaim any resources that may have been in use as soon as the inflight request or a parent context is cancelled. If there is a blocking call in your library, users of your library can benefit from a context-aware API to let them manage the cancellation in an idiomatic way. Contexts also allow you to set a hard deadline to timeout the request, and to include request-scoped data such as a request ID, which might be useful for tracing the request.

When making a NATS request, we may want to give up waiting after a certain time by using context.WithTimeout or arbitrarily cancel an inflight request and propagate the cancellation by calling the given CancelFunc.

The prototype of the Request method now looks like:

// Request takes a context, a subject, and a payload
// and returns a response as a byte array or an error.
func (c *Client) Request(ctx context.Context, subj string, payload []byte) ([]byte, error) {
      // ...
}

And with context based cancellation, its sample usage is:

package main

import (
        "context"
        "log"
        "time"
)

func main() {
        nc := NewClient()
        err := nc.Connect("127.0.0.1:4222")
        if err != nil {
                log.Fatalf("Error: %s", err)
        }
        defer nc.Close()

        nc.Subscribe("help", func(subject, reply string, payload []byte) {
                log.Printf("[Received] %s", string(payload))
                nc.Publish(reply, []byte("ok!"))
        })

        ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
        defer cancel()

        for {
                response, err := nc.Request(ctx, "help", []byte("please"))
                if err != nil {
                        break
                }
                log.Println("[Response]", string(response))
        }
}

Internally, we will implement Request/Response support by creating a unique subscription to an inbox for each client. When publishing a request to the server, the client will pass the identifier for the subscription, and then wait for another subscriber to publish the response to the inbox of the client.

The resulting protocol looks like this:

# Requestor
SUB _INBOX.nwOOaWSeWrt0ok20pKFfNz.*  2
PUB help _INBOX.nwOOaWSeWrt0ok20pKFfNz.nwOOaWSeWrt0ok20pKFfPo 6
please
MSG _INBOX.nwOOaWSeWrt0ok20pKFfNz.nwOOaWSeWrt0ok20pKFfPo 2 10
I can help

# Responder
SUB help  1
MSG help 1 _INBOX.nwOOaWSeWrt0ok20pKFfNz.nwOOaWSeWrt0ok20pKFfPo 6
please
PUB _INBOX.nwOOaWSeWrt0ok20pKFfNz.nwOOaWSeWrt0ok20pKFfPo 10
I can help

To generate unique identifiers, we can use the crypto/rand package from Go (the NATS client itself uses the custom-written nuid library, which generates identifiers with better performance). A sample implementation follows of the setup required to have the client receive requests similar to the new request/response scheme in the 1.3.0 release of the NATS client.

type Client struct {
        // respSub is the wildcard subject on which the responses
        // are received.
        respSub string

        // respMap is a map of request inboxes to the channel that is
        // signaled when the response is received.
        respMap map[string]chan []byte

        // respMux is the subscription on which the requests
        // are received.
        respMux *Subscription

        // respSetup is used to set up the wildcard subscription
        // for requests.
        respSetup sync.Once
}

// Request takes a context, a subject, and a payload
// and returns a response as a byte array or an error.
func (c *Client) Request(ctx context.Context, subj string, payload []byte) ([]byte, error) {
        c.Lock()
        // Set up request subscription if we haven't done so already.
        if c.respMap == nil {
                u := make([]byte, 11)
                io.ReadFull(rand.Reader, u)
                c.respSub = fmt.Sprintf("%s.%s.*", "_INBOX", hex.EncodeToString(u))
                c.respMap = make(map[string]chan []byte)
        }

        // Buffered channel  awaits a single response.
        dataCh := make(chan []byte, 1)
        u := make([]byte, 11)
        io.ReadFull(rand.Reader, u)
        token := hex.EncodeToString(u)
        c.respMap[token] = dataCh

        ginbox := c.respSub
        prefix := c.respSub[:29] // _INBOX. + unique prefix
        respInbox := fmt.Sprintf("%s.%s", prefix, token)
        createSub := c.respMux == nil
        c.Unlock()

        if createSub {
                var err error
                c.respSetup.Do(func() {
                        fn := func(subj, reply string, data []byte) {
                                // _INBOX. + unique prefix + . + token
                                respToken := subj[30:]

                                // Dequeue the first response only.
                                c.Lock()
                                mch := c.respMap[respToken]
                                delete(c.respMap, respToken)
                                c.Unlock()
                                select {
                                case mch <- data:
                                default:
                                        return
                                }
                        }

                        var sub *Subscription
                        sub, err = c.Subscribe(ginbox, fn)
                        c.Lock()
                        c.respMux = sub
                        c.Unlock()
                })
                if err != nil {
                        return nil, err
                }
        }
        // ...

Then we publish the request, tagging it with the unique inbox for this request. We signal a flush and wait for the global request handler subscription unique to the client receive the message, or let the context timeout be canceled.

func (c *Client) Request(ctx context.Context, subj string, payload []byte) ([]byte, error) {
        // ...

       // Publish the request along with the payload, then wait
        // for the reply to be sent to the client or for the
        // context to timeout:
        //
        // PUB subject reply-inbox #number-of-bytes\r\n
        // <payload>\r\n
        //
        msgh := []byte("PUB ")
        msgh = append(msgh, subj...)
        msgh = append(msgh, ' ')
        msgh = append(msgh, respInbox...)
        msgh = append(msgh, ' ')

        var b [12]byte
        var i = len(b)
        if len(payload) > 0 {
                for l := len(payload); l > 0; l /= 10 {
                        i -= 1
                        b[i] = digits[l%10]
                }
        } else {
                i -= 1
                b[i] = digits[0]
        }
        msgh = append(msgh, b[i:]...)
        msgh = append(msgh, _CRLF_...)

        c.Lock()
        _, err := c.w.Write(msgh)
        if err == nil {
                _, err = c.w.Write(payload)
        }
        if err == nil {
                _, err = c.w.WriteString(_CRLF_)
        }

        if err != nil {
                c.Unlock()
                return nil, err
        }


        // Signal a flush for the request if there are none pending (length is zero).
        if len(c.fch) == 0 {
                select {
                case c.fch <- struct{}{}:
                default:
                }
        }
        c.Unlock()


        // Wait for the response via the data channel or give up if context is done
        select {
        case data := <-dataCh:
                return data, nil
        case <-ctx.Done():
                return nil, ctx.Err()
        }

        return nil, nil
}

Wrapping up

This series of articles demonstrated some of the Go techniques that have been adopted in the NATS client during its evolution. We’ll finish with a couple of observations that can help you adapt these solutions to your particular needs.

  • Channels might not always be the best solution for communicating. They're really great for signaling, but sometimes not the best for sharing data. In the case of the NATS client, this meant that a simple mutex and a linked list gave better results.
  • Remember that the Go standard library is often the most useful for the general case. Depending of the circumstances of your program, sometimes what you get in the standard library might not be the best. Go has great tooling to take data based decisions to identify the best approach.
  • The context package is very useful and helps you write more readable code when you’re dealing with cancellation and timeouts. Keep in mind, though, that the community is still actively figuring out the best practices around the use of this package.

Lastly, it should be noted that the Go community moves fast and things continue to be improved under the hood, so it is important to keep up with the ecosystem and to give feedback as well, in order to help improve the libraries and tools further (see, for example, the number of experience reports collected from the community). Everyone on the forums is super helpful, and there are many great resources. Joining Gopher Slack is great idea too, if you are a Slack user.

In this series we have shown only a few examples of how the NATS project has benefited from the flexibility and simplicity from Go. If you’re further interested in more techniques that have worked for the NATS team in writing high performance Go code, you can also check the GopherCon 2014 talk on the subject by Derek Collison (creator of NATS), and the excellent StrangeLoop 2017 talk from Tyler Treat from the NATS team. These have many more insights on improving the performance of Go programs.

Article image: Radio Telescope (source: Didgeman).