Slide Rule
Slide Rule (source: charlemagne)

The previous article in this series created a client that communicated with a server in a simple fashion. This article shows how to add features that make the client more viable for production use. Problems we’ll solve include:

  1. Each message received from the server will block the read loop while executing the callback that handles the message, because the loop and callback run in the same goroutine. This also means that we cannot the implement Request() and Flush() methods that the NATS Go client offers.
  2. All publish commands are triggering a flush to the server and blocking when doing so, impacting performance.

We’ll fix these problems in this article. The third and last section of the article will build on the client we create here to build in Request/Response functionality for one-to-one communication. Other useful functionality that is not covered in this series, but that a production client should have, include:

  • Customizing the connection.
  • Using TLS, which is required by many servers.
  • Handling disconnections or protocol errors from the server. We would want the client to try to reconnect to a server after a failure or if the server we were connected to has gone away. All clients maintained by the NATS team support this kind of reconnection functionality.

This article shows how to improve performance by using multiple goroutines, storing messages efficiently, and combining messages so that many can be sent over a single network payload.

Improving how messages are consumed

Currently this basic client blocks the reading loop whenever it receives a message:

func (c *Client) processMsg(subj string, reply string, sid int, payload []byte) {
        c.Lock()
        cb, ok := c.subs[sid]
        c.Unlock()

        if ok {
                // NOTE: This blocks the parser read loop too!
                cb(subj, reply, payload)
        }
}

It is very important to change this behavior, because if we open multiple subscriptions we do not want them to be blocking each other. Currently, one slow callback could delay the processing of other messages. For example, the following program publishes 10 messages on the hello subject, to be received on two subscriptions. The first subscription is to the foo subject, while the other is to the > wildcard subject, so it will receive the message as well. But since we are executing the two subscriptions in the same loop whenever a message is delivered by the server, only one of the callbacks would be able to process the message at a time:

func main() {
        nc := NewClient()
        err := nc.Connect("127.0.0.1:4222")
        if err != nil {
                log.Fatalf("Error: %s", err)
        }
        defer nc.Close()

        recv := 0
        doneCh := make(chan struct{})
        nc.Subscribe("hello", func(subject, reply string, payload []byte) {
                log.Printf("[Received] %s", string(payload))
                recv++
                if recv == 20 {
                        close(doneCh)
                }
        })

        nc.Subscribe(">", func(subject, reply string, payload []byte) {
                log.Printf("[Wildcard] %s", string(payload))

                // Block the other subscription for 1 second.
                time.Sleep(1 * time.Second)
        })

        for i := 0; i < 10; i++ {
                err = nc.Publish("hello", []byte("world"))
                if err != nil {
                        log.Fatalf("Error: %s", err)
                }
        }

        <-doneCh
}

Even worse, the callbacks run on the same goroutine as the read loop. This jeopardizes the entire client. Suppose the server sends a PING message to the client, which it could do at any time to check whether the client is still up. If one of the subscription callbacks in the client hangs for a couple minutes, the client would not be able to reply back with a PONG, and if we miss a couple of these the server will consider the connection to be unhealthy and disconnect the client.

In the official NATS client, the initial approach taken to improve this situation was to use a goroutine per subscription and then communicate the messages being received by using a buffered channel. Although using a buffered channel for communicating is a very common and idiomatic Go technique, for the NATS client it turned out to not be the best way to deliver the messages in the presence of a large number of subscriptions. We’ll try implementing this solution in order to see its limitations.

Following the buffered channel approach, we can enhance the Subscribe method from the client so that each one of the subscriptions has its own goroutine to process the messages and uses a buffered channel to avoid blocking the reader loop. Whenever the server delivers a message to the client, the message will be sent to the channel. This message processing scheme is shown in the following code:

 
// Msg represents message received by the server.
type Msg struct {
        Subject string
        Reply   string
        Data    []byte
}

// Subscription represents a subscription by the client.
type Subscription struct {
        mch chan *Msg
}

func (c *Client) Subscribe(subject string, cb MsgHandler) (*Subscription, error) {
        c.Lock()
        defer c.Unlock()
        c.sid += 1
        sid := c.sid

        sub := fmt.Sprintf("SUB %s %d\r\n", subject, sid)
        _, err := c.w.WriteString(sub)
        if err != nil {
                return nil, err
        }

        err = c.w.Flush()
        if err != nil {
                return nil, err
        }

        mch := make(chan *Msg, 8192)
        s := &Subscription{
                mch: mch,
        }

        c.subs[sid] = s

        // Wait for messages under a goroutine
        go func() {
                for {
                        msg, wd := <-mch
                        if !wd {
                                break
                        }
                        cb(msg.Subject, msg.Reply, msg.Data)
                }
        }()

        return s, nil
}

Then, when receiving the messages, we try to send the data through the channel, or drop the message (enabled by the “default” condition in the following snippet) in case the buffered channel has been filled, so we don’t cause the channel to block waiting for room to buffer more messages:

func (c *Client) processMsg(subj string, reply string, sid int, payload []byte) {
        c.Lock()
        sub, ok := c.subs[sid]
        c.Unlock()

        if ok {
                msg := &Msg{subj, reply, payload}
                select {
                case sub.mch <- msg:
                default:
                        // Slow consumer case: client blocks because
                        // it is processing messages slower than
                        // the server is receiving them.
                }
        }
}

This messages consumption scheme works, but still has some limitations. Specifically, since buffered channels are used, we now allocate a buffer for messages. If the buffer is too small, the client will drop messages when the buffer fills up during a burst of messages from the server. But if the buffer is unnecessarily large, we could waste a lot of memory when we subscribe to a lot of channels. So this is not a very efficient or flexible solution.

We can confirm in the snippet below that memory consumption would be a problem by using the runtime package. If you try the program out with different numbers of subscriptions, you’ll see that the larger the buffered channel is, the more total allocations will be reported.

package main

import (
        "fmt"
        "runtime"
)

type Msg struct {
        Subject string
        Reply   string
        Data    []byte
}

func main() {
        // Allocate channel with a large buffer to prevent dropping messages on spikes.
        s := make(chan *Msg, 65536)

        var mem runtime.MemStats
        runtime.ReadMemStats(&mem)

        // => 559112
        fmt.Printf("%+v", mem.TotalAlloc)
        select {
        case <-s:
        default:
        }
}

Replacing buffers with a list and sync.Cond

Instead of using a buffered channel for each subscription, a more memory friendly and flexible technique in Go—and the one currently used in the standard NATS client—is to combine conditional variables with a linked list of the received messages.

// Msg represents message received by the server.
type Msg struct {
        Subject string
        Reply   string
        Data    []byte
        next    *Msg
}

// Subscription represents a subscription by the client.
type Subscription struct {
        pHead *Msg
        pTail *Msg
        pCond *sync.Cond
        mu    sync.Mutex
        sid   int
}

In the Subscribe method, we make the goroutine wait for the messages as they are received and then wait for processMsg to be signaled by the parser when there is a pending message to process.

// Subscribe registers interest in a subject.
func (c *Client) Subscribe(subject string, cb MsgHandler) (*Subscription, error) {
        c.Lock()
        defer c.Unlock()
        c.sid += 1
        sid := c.sid

        sub := fmt.Sprintf("SUB %s %d\r\n", subject, sid)
        _, err := c.w.WriteString(sub)
        if err != nil {
                return nil, err
        }

        err = c.w.Flush()
        if err != nil {
                return nil, err
        }

        s := &Subscription{sid: sid}
        s.pCond = sync.NewCond(&s.mu)
        c.subs[sid] = s

        // Wait for messages under a goroutine
        go func() {
                for {
                        s.mu.Lock()
                        if s.pHead == nil {
                                s.pCond.Wait()
                        }
                        msg := s.pHead
                        if msg != nil {
                                s.pHead = msg.next
                                if s.pHead == nil {
                                        s.pTail = nil
                                }
                        }
                        s.mu.Unlock()

                        if msg != nil {
                                cb(msg.Subject, msg.Reply, msg.Data)
                        }
                }
        }()

        return s, nil
}

func (c *Client) processMsg(subj string, reply string, sid int, payload []byte) {
        c.Lock()
        sub, ok := c.subs[sid]
        c.Unlock()

        if ok {
                msg := &Msg{subj, reply, payload, nil}
                sub.mu.Lock()
                if sub.pHead == nil {
                        sub.pHead = msg
                        sub.pTail = msg
                } else {
                        sub.pTail.next = msg
                        sub.pTail = msg
                }
                sub.pCond.Signal()
                sub.mu.Unlock()
        }
}

Now the list can grow indefinitely as needed. Although manipulating the pointers in the list adds a little more overhead than the old buffer solution, it proves better when there is a large number of subscriptions, because it requires much less memory.

Coalescing writes to improve publisher performance

A quick benchmark of the publisher client shows that it is pretty bad, especially when compared against the official client. I created the following benchmarking code:

 
import (
        "log"
        "testing"
        "time"

        "github.com/nats-io/go-nats"
)

func Benchmark_Publish_Basic(b *testing.B) {
        b.StopTimer()
        b.StartTimer()

        nc := NewClient()
        err := nc.Connect("127.0.0.1:4222")
        if err != nil {
                log.Fatalf("Error: %s", err)
        }
        defer nc.Close()

        total := 1000000
        payload := []byte("a")
        for i := 0; i < total; i++ {
                nc.Publish("a", payload)
                if i%1000 == 0 {
                        time.Sleep(1 * time.Nanosecond)
                }
        }

        b.StopTimer()
}

func Benchmark_Publish_GoClient(b *testing.B) {
        b.StopTimer()
        b.StartTimer()

        nc, err := nats.Connect("nats://127.0.0.1:4222")
        if err != nil {
                log.Fatalf("Error: %s", err)
        }
        defer nc.Close()

        total := 1000000
        payload := []byte("a")
        for i := 0; i < total; i++ {
                nc.Publish("a", payload)
                if i%1000 == 0 {
                        time.Sleep(1 * time.Nanosecond)
                }
        }

        b.StopTimer()
}

Our implementation is several orders of magnitude slower:

 
go test ./... -v -bench=Benchmark_Publish -benchmem
Benchmark_Publish_Basic-4              1        6680169371 ns/op        82366752 B/op    5972571 allocs/op
Benchmark_Publish_GoClient-4    2000000000             0.09 ns/op              0 B/op          0 allocs/op

The main reason for this is that our implementation is calling Flush on pretty much any command sent to the server, in order to send the bytes to the server synchronously. Although flushing ensures that the server receives all the client’s messages in the order in which they were sent, this is hardly ever necessary. The technique taken in the NATS Go client to improve performance was to implement an asynchronous flushing scheme. All the publishing methods in the client add their writes to a bufio.Writer channel. The client also creates a dedicated goroutine with a flusher loop that waits on this channel, and is signaled whenever it should flush the bytes that may have been accumulated so far. Thanks to the coalescing of writes under this simple flushing scheme, the NATS Go client is able to send millions of messages per second to a NATS server.

In contrast to the message consumption problem addressed in the previous section, where we replaced a buffered channel with a linked list and conditional variable, a buffered channel fits really well with the coalescing of writes. We can write to a buffered channel of empty structs and then, after we successfully connect to the server, spin up the flusher goroutine, which waits for the channel to be signaled:

// Client is a basic client for NATS.
type Client struct {
        // ...
        fch chan struct{}
}

// Connect establishes a connection to a NATS server.
func (c *Client) Connect(netloc string) error {
        conn, err := net.Dial("tcp", netloc)
        if err != nil {
                return err
        }
        c.conn = conn
        c.w = bufio.NewWriter(conn)

        // Buffered channel, which will be used to signal
        // the flusher that there is pending data to be
        // sent to the server.
        c.fch = make(chan struct{}, 1024)

        // Spawn goroutine for the parser reading loop.
        go readLoop(c, conn)

        // Spawn a flusher goroutine which waits to be signaled
        // that there is pending data to be flushed.
        go func() {
                for {
                        if _, ok := <-c.fch; !ok {
                                return
                        }
                        c.Lock()
                        if c.w.Buffered() > 0 {
                                c.w.Flush()
                        }
                        c.Unlock()
                }
        }()

        return nil
}

Whenever we send a command to the server, we will schedule a flush and let the flusher loop flush the bufio.Writer eventually, when the loop receives a signal that the fch channel sends when Publish was called.

func (c *Client) Publish(subject string, payload []byte) error {
        c.Lock()

        pub := fmt.Sprintf("PUB %s %d\r\n", subject, len(payload))
        _, err := c.w.WriteString(pub)
        if err == nil {
                _, err = c.w.Write(payload)
        }
        if err == nil {
                _, err = c.w.WriteString("\r\n")
        }
        if err != nil {
                c.Unlock()
                return err
        }

        // Kick off the flusher loop if there is no pending data to be flushed yet.
        if len(c.fch) == 0 {
                select {
                case c.fch <- struct{}{}:
                default:
                }
        }
        c.Unlock()

        return nil
}

With the flusher improvements, the benchmark shows that our client is now much faster, although still not very close to what the performance of the official Go Client.

 
go test ./... -v -bench=Benchmark_Publish -benchmem
Benchmark_Publish_Basic-4       1000000000               0.79 ns/op            0 B/op          0 allocs/op
Benchmark_Publish_GoClient-4    2000000000               0.09 ns/op 

To learn where the extra performance is coming from in the NATS client, let's check its publishing method. Notably, we find that the client is rolling its own Itoa when crafting the PUB command to tell the server how many bytes it is going to send in the payload:

// Used for hand rolled itoa
const digits = "0123456789"

msgh := []byte("PUB ")
msgh = append(msgh, subject...)
msgh = append(msgh, ' ')

// msgh = strconv.AppendInt(msgh, int64(len(data)), 10)

// faster alternative
var b [12]byte
var i = len(b)
if len(data) > 0 {
        for l := len(data); l > 0; l /= 10 {
                i -= 1
                b[i] = digits[l%10]
        }
} else {
        i -= 1
        b[i] = digits[0]
}
msgh = append(msgh, b[i:]...)
_, err := c.w.Write(msgh)
if err != nil {
        return err
}

Although strconv.AppendInt in the standard library has its own set of optimizations (for example, starting in Go 1.9, there is now a fast path for ints lower than 100), the hand-rolled approach is still faster for NATS, even though it constrains the payload size to at most 1000 GB. This trade-off is fine, as the NATS server is not really suited to handle payload sizes that large (the maximum payload size allowed by default is 1MB, though it can be bumped via the server config).

With the improvements in this article, the client will now be more reliable and show much better performance characteristics when publishing and receiving messages. In the next article, we will leverage these improvements and implement a low latency request/response mechanism which uses ephemeral NATS subscriptions and the context package from Go for cancellation of blocking requests.

Article image: Slide Rule (source: charlemagne).