Building messaging in Go network clients

Learn useful Go communication techniques from the internals of the NATS client.

By Waldemar Quevedo
November 8, 2017
Bottle Bottle (source: PeterBjorndal)

In today’s world of data-intensive applications and frameworks tailored to tasks as varied as service discovery and stream processing, distributed systems are becoming ubiquitous. A reliable and high-performing networking client is essential to accessing and scaling such systems. Often, implementing a network client for a custom protocol can seem like a daunting task. If a protocol is too complex (as custom protocols have a tendency to become), maintaining and implementing the client can be a burden. Moreover, it is ideal to have good language support for doing asynchronous programming when implementing anything at a higher level than working with sockets. Handling all this customization and multi-language support can be greatly simplified by picking the right approach from the outset.

Fortunately, the Go programming language facilitates the task of developing networking clients by offering a robust standard library, excellent concurrency built-ins, and great tooling for doing benchmarks and static analysis, as well as having great performance and overall being a very flexible language for systems programming.

Learn faster. Dig deeper. See farther.

Join the O'Reilly online learning platform. Get a free trial today and find answers on the fly, or master something new and useful.

Learn more

Many useful lessons can be drawn from the NATS project, a simple, high-performance, open-source messaging system for cloud native applications created in 2010 by Derek Collison for CloudFoundry. The original codebase was in Ruby and was then rewritten in Go starting in 2012 as part of Collison’s next venture, Apcera. NATS has benefited a lot from this change since then. Most notably, shifting to Go realized substantial performance gains (a single NATS server can now process around 14M messages per second). The Go community itself has been amazing, and a great group to be involved in.

During the past 5 years, both the NATS and Go communities have been evolving rapidly, so there have been consistent improvements to NATS that follow improvements to Go.

This series of three articles show you how to communicate in a fast and scalable manner with large numbers of servers. The recommendations are based on recent practices adopted by the Go NATS client, such as:

  • Using buffered channels for backpressure and writes coalescing with the bufio package from Go to achieve fast publishing.
  • Implementing a messages consumption scheme using conditional variables and a lock which uses much less memory than buffered channels.
  • Using Go benchmarking tools to evaluate an adequate Itoa implementation to use for when assembling protocol commands.
  • Adopting the context package for the request/response mechanism and cancellation.

The three articles will use the same principles to show you how to handle communications in your own client.

One of the advantages of NATS is its dedication to simplicity. The protocol is plain-text and fairly straightforward, and this helps tremendously in maintaining and writing new clients for it. Subscribers set up any number of subjects they want (the subject being simply a string included as the first argument of each message) and the clients subscribe to chosen subjects. Each message includes the subject along with an arbitrary number called a subscription ID or sid, allowing the recipient to reply. A message requiring a reply must also contain a reply-to field, which is just another string representing a subject. The recipient uses that subject to reply, and the original sender receives the reply on that subject. Finally, messages list the number of characters in the payload, followed by the payload itself.

A client can send to the server the following types of protocol messages:

# Sent to server to specify connection information, like the name for the client (optional)
CONNECT {"name": "hello-world"}\r\n

# Send message of 5 bytes to 'hello' subject
PUB hello 5\r\n
world\r\n

# Send message of 5 bytes to 'hello' subject, expecting reply on 'inbox' subject
PUB hello inbox 5\r\n
world\r\n

# Subscribe to 'hello' subject to receive messages, using '1' as an identifier
SUB hello 1\r\n

# Unsubscribe from subscription using identifier '1'
UNSUB 1\r\n

Then a client can expect any of the following protocol messages from the server:

# Sent by the server after establishing TCP connection to provide parameters about the connection
INFO {"max_payload": 1048576}\r\n

# Message of 5 bytes on 'hello' subject identified with '1' by client
MSG hello 1 5\r\n
world\r\n
# Message of 5 bytes on 'hello' subject identified with '1' by client with reply subject 'inbox'
MSG hello 1 inbox 5\r\n
world\r\n
# Acknowledges a well-formed protocol message from the client.
+OK\r\n

# Error message sent by the server
-ERR ''\r\n

As you can see from the examples, the payload in a message is delivered as raw bytes after the size argument, and is terminated by \r\n characters.

The official NATS Go client has emphasized the use of standard Go constructs, such as channels and goroutines, sometimes replacing custom-built functions with Go functions as they become ready. We can easily have a basic implementation of a NATS client in a few hundreds of lines using nothing outside of what is provided by the Go standard library if we wanted that.

A simple 80-line client that can publish messages to the server follows. Note that in the Publish() function, the payload is a slice of bytes that represents the data that will be sent to the server. This could be encoded in JSON, Protocol Buffers, or any other serialization format, but the server does not care because it will treat the payload as an opaque blob of application data.

package main

import (
        "bufio"
        "fmt"
        "log"
        "net"
        "sync"
)

// Client is a basic client to the NATS server.
type Client struct {
        conn net.Conn
        w    *bufio.Writer
        sync.Mutex
}

// NewClient returns a NATS client.
func NewClient() *Client {
        return &Client{}
}

// Connect establishes a connection to a NATS server.
func (c *Client) Connect(netloc string) error {
        conn, err := net.Dial("tcp", netloc)
        if err != nil {
                return err
        }
        c.conn = conn
        c.w = bufio.NewWriter(conn)

        return nil
}

// Publish takes a subject as an immutable string and payload in bytes,
// then sends the message to the server.
func (c *Client) Publish(subject string, payload []byte) error {
        c.Lock()
        pub := fmt.Sprintf("PUB %s %d\r\n", subject, len(payload))
        _, err := c.w.WriteString(pub)
        if err == nil {
                _, err = c.w.Write(payload)
        }
        if err == nil {
                _, err = c.w.WriteString("\r\n")
        }
        if err == nil {
                err = c.w.Flush()
        }
        c.Unlock()
        if err != nil {               
                return err
        }

        return nil
}

// Close terminates a connection to NATS.
func (c *Client) Close() {
        c.Lock()
        defer c.Unlock()
        c.conn.Close()
}

func main() {
        nc := NewClient()
        err := nc.Connect("127.0.0.1:4222")
        if err != nil {
                log.Fatalf("Error: %s", err)
        }
        defer nc.Close()

        err = nc.Publish("hello", []byte("world"))
        if err != nil {
                log.Fatalf("Error: %s", err)
        }
}

Running the example client, we can confirm that the messages are being sent without errors by establishing a connection using telnet to the endpoint and making a subscription on the subject.

telnet demo.nats.io 4222
...
sub hello 1 
+OK
MSG hello 1 5
world
MSG hello 1 5
world
MSG hello 1 5
world

In order to make the client receive messages, we need to support subscribing to a publisher and parsing the protocol. The official NATS Go client uses an optimized parser that handles the protocol byte per byte, while also handling split buffer scenarios, but we can do a trivial implementation of a protocol parser in less than a hundred lines:

 
import (
        "bufio"
        "errors"
        "fmt"
        "io"
        "net"
        "strings"
)

type client interface {
        processInfo(line string)
        processMsg(subj string, reply string, sid int, payload []byte)
        processPing()
        processPong()
        processErr(msg string)
        processOpErr(err error)
}

// readLoop is ran as a goroutine and processes commands
// sent by the server.
func readLoop(c client, conn net.Conn) {
        r := bufio.NewReader(conn)

        for {
                line, err := r.ReadString('\n')
                if err != nil {
                        c.processOpErr(err)
                        return
                }
                args := strings.SplitN(line, " ", 2)
                if len(args) < 1 {
                        c.processOpErr(errors.New("Error: malformed control line"))
                        return
                }

                op := strings.TrimSpace(args[0])
                switch op {
                case "MSG":
                        var subject, reply string
                        var sid, size int

                        n := strings.Count(args[1], " ")
                        switch n {
                        case 2:
                                // No reply is expected in this case (message is just broadcast).
                                // MSG foo 1 3\r\n
                                // bar\r\n
                                _, err := fmt.Sscanf(args[1], "%s %d %d",
                                        &subject, &sid, &size)
                                if err != nil {
                                        c.processOpErr(err)
                                        return
                                }
                        case 3:
                                // Reply is expected in this case (a request).
                                // MSG foo 1 bar 4\r\n
                                // quux\r\n
                                _, err := fmt.Sscanf(args[1], "%s %d %s %d",
                                        &subject, &sid, &reply, &size)
                                if err != nil {
                                        c.processOpErr(err)
                                        return
                                }
                        default:
                                c.processOpErr(errors.New("nats: bad control line"))
                                return
                        }

                        // Prepare buffer for the payload
                        payload := make([]byte, size)
                        _, err = io.ReadFull(r, payload)
                        if err != nil {
                                c.processOpErr(err)
                                return
                        }
                        // In the two-argument case, the reply below is null.
                        c.processMsg(subject, reply, sid, payload)
                case "INFO":
                        c.processInfo(args[1])
                case "PING":
                        c.processPing()
                case "PONG":
                        c.processPong()
                case "+OK":
                        // Do nothing.
                case "-ERR":
                        c.processErr(args[1])
                }
        }
}

Each time this code receives a message from the server, we call processMsg and pass it the arguments it needs to transfer the message.

One of the arguments in processMsg is the subscription ID which the client used when registering the subscription. In the client, this subscription ID (sid) will be used to identify the callback which will be used to process the message.

Although the subscription ID is arbitrary, we have to make sure each message has a unique one. A simple way to do so is to define the ID as a monotonically increasing counter as part of the client state. Note that these would be closures from the client perspective as the server will simply pass the information back as an opaque string when the subscription matches so even though we use a number, a string could have worked too.

 
type MsgHandler func(subject, reply string, b []byte)

// Subscription represents a subscription by the client.
type Subscription struct {
        cb  MsgHandler
}

// Client is a basic client to the NATS server.
type Client struct {
        conn net.Conn
        w    *bufio.Writer

        // sid increments monotonically per subscription and it is
        // used to identify a subscription from the client when
        // receiving a message.
        sid int

        // subs maps a subscription identifier to its subscription.
        subs map[int]*Subscription

        sync.Mutex
}

// NewClient returns a NATS client.
func NewClient() *Client {
        return &Client{
                subs: make(map[int]*Subscription),
        }
}

func (c *Client) processMsg(subj string, reply string, sid int, payload []byte) {
        c.Lock()
        sub, ok := c.subs[sid]
        c.Unlock()

        if ok {
                // NOTE: This blocks the parser read loop too!
                sub.cb(subj, reply, payload)
        }
}

func (c *Client) processPing() {
        c.Lock()
        defer c.Unlock()

        // Reply back to prevent stale connection error.
        c.w.WriteString("PONG\r\n")
        c.w.Flush()
}

Then, we implement a Subscribe method, which takes a callback that will be executed whenever we receive a message matching that subscription.

 
func (c *Client) Subscribe(subject string, cb MsgHandler) (*Subscription, error) {
        c.Lock()
        defer c.Unlock()
        c.sid += 1
        sid := c.sid

        sub := fmt.Sprintf("SUB %s %d\r\n", subject, sid)
        _, err := c.w.WriteString(sub)
        if err != nil {
                return nil, err
        }

        err = c.w.Flush()
        if err != nil {
                return nil, err
        }

        s := &Subscription{cb}
        c.subs[sid] = s

        return s, nil
}

Now we have a client that can handle publishing and receiving messages in just a few hundred lines, with an API that resembles the official NATS Go client. The following text program sends 10 messages to the hello subject. One could easily subscribe to any number of subjects.

 
func main() {
        nc := NewClient()
        err := nc.Connect("127.0.0.1:4222")
        if err != nil {
                log.Fatalf("Error: %s", err)
        }
        defer nc.Close()

        recv := 0
        doneCh := make(chan struct{})
        nc.Subscribe("hello", func(subject, reply string, payload []byte) {
                log.Printf("[Received] %s", string(payload))
                recv++
                if recv == 10 {
                        close(doneCh)
                }
        })

        for i := 0; i < 10; i++ {
                err = nc.Publish("hello", []byte("world"))
                if err != nil {
                        log.Fatalf("Error: %s", err)
                }
        }

        <-doneCh
}

But as the volume of traffic goes up, our simple client will fail to respond with satisfactory speed to requests. Therefore, in the next article, Scaling messaging in Go network clients, we’ll explore techniques used in the standard NATS client to save time and make the client more viable for production use.

Post topics: Software Engineering
Share: