Chapter 4. Dealing with State Change

It is not uncommon to have application processes that need to learn about changes to the state of a ZooKeeper ensemble. For instance, in our example in Chapter 1, backup masters need to know that the primary master has crashed, and workers need to know when new tasks have been assigned to them. ZooKeeper clients could, of course, poll the ZooKeeper ensemble periodically to determine whether changes have occurred. Polling, however, is not efficient, especially when the expected changes are somewhat rare.

For example, let’s consider backup masters; they need to know when the primary has crashed so that they can fail over. To reduce the time it takes to recover from the primary crash, we need to poll frequently—say, every 50 ms—just for an example of aggressive polling. In this case, each backup master generates 20 requests/second. If there are multiple backup masters, we multiply this frequency by the number of backups to obtain the total request traffic generated just to poll ZooKeeper for the status of the primary master. Even if such an amount of traffic is easy for a system like ZooKeeper to deal with, primary master crashes should be rare, so most of this traffic is unnecessary. Suppose we therefore reduce the amount of polling traffic to ZooKeeper by increasing the period between requests for the status of the primary, say to 1 second. The problem with increasing this period is that it increases the time it takes to recover from a primary crash.

We can avoid this tuning and polling traffic altogether by having ZooKeeper notify interested clients of concrete events. The primary mechanism ZooKeeper provides to deal with changes is watches. With watches, a client registers its request to receive a one-time notification of a change to a given znode. For example, we can have the primary master create an ephemeral znode representing the master lock, and the backup masters register a watch for the existence of the master lock. If the primary crashes, the master lock is automatically deleted and the backup masters are notified. Once the backup masters receive their notifications, they can start a new master election by trying to create a new ephemeral znode, as we showed in “Getting Mastership”.

Watches and notifications form a general mechanism that enables clients to observe changes by other clients without having to continually poll ZooKeeper. We have illustrated the use of this mechanism with the master example, but the general mechanism is applicable to a wide variety of situations.

One-Time Triggers

Before getting deeper into watches, let’s establish some terminology. We talk about an event to denote the execution of an update to a given znode. A watch is a one-time trigger associated with a znode and a type of event (e.g., data is set in the znode, or the znode is deleted). When the watch is triggered by an event, it generates a notification. A notification is a message to the application client that registered the watch to inform this client of the event.

When an application process registers a watch to receive a notification, the watch is triggered at most once and upon the first event that matches the condition of the watch. For example, say that the client needs to know when a given znode /z is deleted (e.g., a backup master). The client executes an exists operation on /z with the watch flag set and waits for the notification. The notification comes in the form of a callback to the application client.

Each watch is associated with the session in which the client sets it. If the session expires, pending watches are removed. Watches do, however, persist across connections to different servers. Say that a ZooKeeper client disconnects from a ZooKeeper server and connects to a different server in the ensemble. The client will send a list of outstanding watches. When re-registering the watch, the server will check to see if the watched znode has changed since the previous registration. If the znode has changed, a watch event will be sent to the client; otherwise, the watch will be re-registered at the new server. This behavior of re-registering watches can be turned off by setting the system property zookeeper.disableAutoWatchReset.

Wait, Can I Miss Events with One-Time Triggers?

The short answer is “yes”: an application can miss events between receiving a notification and registering for another watch. However, this issue deserves more discussion. Missing events is typically not a problem because any changes that have occurred during the period between receiving a notification and registering a new watch can be captured by reading the state of ZooKeeper directly.

Say that a worker receives a notification indicating that a new task has been assigned to it. To receive the new task, the worker reads the list of tasks. If several more tasks have been assigned to the worker after it received the notification, reading the list of tasks via a getChildren call returns all tasks. The getChildren call also sets a new watch, guaranteeing that the worker will not miss tasks.

Actually, having one notification amortized across multiple events is a positive aspect. It makes the notification mechanism much more lightweight than sending a notification for every event for applications that have a high rate of updates. To give a trivial example, if every notification captures two events on average, we are generating only 0.5 notifications per event instead of 1 notification per event.

Getting More Concrete: How to Set Watches

All read operations in the ZooKeeper API—getData, getChildren, and exists—have the option to set a watch on the znode they read. To use the watch mechanism, we need to implement the Watcher interface, which consists of implementing a process method:

public void process(WatchedEvent event);

The WatchedEvent data structure contains the following:

  • The state of the ZooKeeper session (KeeperState): Disconnected, SyncConnected, AuthFailed, ConnectedReadOnly, SaslAuthenticated, or Expired

  • The event type (EventType): NodeCreated, NodeDeleted, NodeDataChanged, NodeChildrenChanged, or None

  • A znode path in the case that the event type is not None

The first three events refer to a single znode, whereas the fourth event concerns the children of the znode on which it is issued. We use None when the watched event is for a change of the state of the ZooKeeper session.

There are two types of watches: data watches and child watches. Creating, deleting, or setting the data of a znode successfully triggers a data watch. exists and getData both set data watches. Only getChildren sets child watches, which are triggered when a child znode is either created or deleted. For each event type, we have the following calls for setting a watch:

NodeCreated

A watch is set with a call to exists.

NodeDeleted

A watch is set with a call to either exists or getData.

NodeDataChanged

A watch is set with either exists or getData.

NodeChildrenChanged

A watch is set with getChildren.

When creating a ZooKeeper object (see Chapter 3), we need to pass a default Watcher object. The ZooKeeper client uses this watcher to notify the application of changes to the ZooKeeper state, in case the state of the session changes. For event notifications related to ZooKeeper znodes, you can either use the default watcher or implement a different one. For example, the getData call has two different ways of setting a watch:

public byte[] getData(final String path, Watcher watcher, Stat stat);
public byte[] getData(String path, boolean watch, Stat stat);

Both signatures pass the znode as the first argument. The first signature passes a new Watcher object (which we must have created). The second signature tells the client to use the default watcher, and only requires true as the second parameter of the call.

The stat input parameter is an instance of the Stat structure that ZooKeeper uses to return information about the znode designated by path. The Stat structure contains information about the znode, such as the timestamp of the last change (zxid) that changed this znode and the number of children in the znode.

One important observation about watches is that it is not possible to remove them once set in the 3.4 branch and earlier versions. The only two ways to remove a watch are to have it triggered or for its session to be closed or expired. This behavior changes in the 3.5 branch, and starting from that branch it is possible to deregister watches with the removeWatches call.

A Bit of Overloading

We use the same watch mechanism for notifying the application of events related to the state of a ZooKeeper session and events related to znode changes. Although session state changes and znode state changes constitute independent sets of events, we rely upon the same mechanism to deliver such events for simplicity.

A Common Pattern

Before we get into some snippets for the master–worker example, let’s take a quick look at a pretty common code pattern used in ZooKeeper applications:

  1. Make an asynchronous call.

  2. Implement a callback object and pass it to the asynchronous call.

  3. If the operation requires setting a watch, then implement a Watcher object and pass it on to the asynchronous call.

A code sample for this pattern using an asynchronous exists call looks like this:

zk.exists("/myZnode", 1
          myWatcher,
          existsCallback,
          null);

Watcher myWatcher = new Watcher() {  2
    public void process(WatchedEvent e) {
        // Process the watch event
    }
}

StatCallback existsCallback = new StatCallback() { 3
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        // Process the result of the exists call
    }
};
1

ZooKeeper exists call. Note that the call is asynchronous.

2

Watcher implementation.

3

exists callback.

As we will see next, we’ll make extensive use of this skeleton.

The Master–Worker Example

Let’s now look at how we deal with changes of state in the master–worker example. Here is a list of tasks that require a component to wait for changes:

  • Mastership changes

  • Master waits for changes to the list of workers

  • Master waits for new tasks to assign

  • Worker waits for new task assignments

  • Client waits for task execution result

We next show some code snippets to illustrate how to code these tasks with ZooKeeper. We provide the complete example code as part of the additional material to this book.

Mastership Changes

Recall from “Getting Mastership” that an application client elects itself master by creating the /master znode (we call this “running for master”). If the znode already exists, the application client determines that it is not the primary master and returns. That implementation, however, does not tolerate a crash of the primary master. If the primary master crashes, the backup masters won’t know about it. Consequently, we need to set a watch on /master so that ZooKeeper notifies the client when /master is deleted (either explicitly or because the session of the primary master has expired).

To set the watch, we create a new watcher named masterExistsWatcher and pass it to exists. Upon a notification of /master being deleted, the process call defined in masterExistsWatcher calls runForMaster:

StringCallback masterCreateCallback = new StringCallback() {
    public void processResult(int rc, String path, Object ctx, String name) {
        switch (Code.get(rc)) {
        case CONNECTIONLOSS:
            checkMaster(); 1

            break;
        case OK:
            state = MasterStates.ELECTED;
            takeLeadership(); 2

            break;
        case NODEEXISTS:
            state = MasterStates.NOTELECTED;
            masterExists(); 3

            break;
        default:
            state = MasterStates.NOTELECTED;
            LOG.error("Something went wrong when running for master.", 4
                      KeeperException.create(Code.get(rc), path));
        }
    }
};

void masterExists() {
    zk.exists("/master",
              masterExistsWatcher, 5
              masterExistsCallback,
              null);
}

Watcher masterExistsWatcher = new Watcher() {
    public void process(WatchedEvent e) {
        if(e.getType() == EventType.NodeDeleted) {
            assert "/master".equals( e.getPath() );

            runForMaster(); 6
        }
    }
};
1

In the case of a connection loss event, the client checks if the /master znode is there because it doesn’t know if it has been able to create it or not.

2

If OK, then it simply takes leadership.

3

If someone else has already created the znode, then the client needs to watch it.

4

If anything unexpected happens, then it logs the error and doesn’t do anything else.

5

This exists call is to set a watch on the /master znode.

6

If the /master znode is deleted, then it runs for master again.

Following the asynchronous style we used in “Getting Mastership Asynchronously”, we also create a callback method for the exists call that takes care of a few cases. First, in the case of connection loss, it retries the exists operation. Second, it is possible for the /master znode to be deleted between the execution of the create callback and the execution of the exists operation. If that happens, then the callback is invoked with NONODE and we run for master again. For all other cases, we check for the /master znode by getting its data. The final case is the client session expiring. In this case, the callback to get the data of /master logs an error message and exits. Our exists callback looks like this:

StatCallback masterExistsCallback = new StatCallback() {
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        switch (Code.get(rc)) {
        case CONNECTIONLOSS:
            masterExists(); 1

            break;
        case OK:
            break; 2
        case NONODE:
            state = MasterStates.RUNNING;
            runForMaster(); 3

            break;
        default:
            checkMaster(); 4
            break;
        }
    }
};
1

In the case of a connection loss, just try again.

2

If it returns OK, then there is nothing to do.

3

If it returns NONODE, run for master.

4

If something unexpected happens, check if /master is there by getting its data.

The result of the exists operation over /master might be that the znode has been deleted. In this case, the client needs to run for /master again because it is not guaranteed that the watch was set before the znode was deleted. If the new attempt to become primary fails, then the client knows that some other client succeeded and it tries to watch /master again. If the notification for /master indicates that it has been created instead of deleted, the client does not run for /master. At the same time, the corresponding exists operation (the one that has set the watch) must have returned that /master doesn’t exist, which triggers the procedure to run for /master from the exists callback.

Note that this pattern of running for master and executing exists to set a watch on /master continues for as long as the client runs and does not become a primary master. If it becomes the primary master and eventually crashes, the client can restart and reexecute this code.

Figure 4-1 makes the possible interleavings of operations more explicit. If the create operation executed when running for primary master succeeds (a), the application client doesn’t have to do anything else. If the create operation fails because the znode already exists, then the client executes an exists operation to set a watch on the /master znode (b). Between running for master and executing the exists operation, it is possible that the /master znode gets deleted. Let’s assume first that the znode gets deleted before the response to exists is generated. In this case, the client runs for master again (c). Let’s now assume that the response to the exists call has been processed before the znode has been deleted and returns true. When the znode is deleted, ZooKeeper triggers the watch and the client eventually receives it and runs for master again (d).

zook 0401
Figure 4-1. Running for master, possible interleavings

Master Waits for Changes to the List of Workers

New workers may be added to the system and old workers may be decommissioned at any time. Workers might also crash before executing their assignments. To determine the workers that are available at any one time, we register new workers with ZooKeeper by adding a znode as a child of /workers. When a worker crashes or is simply removed from the system, its session expires, automatically causing its znode to be removed. Workers ideally close their sessions without making ZooKeeper wait for a session expiration.

The primary master uses getChildren to obtain the list of available workers and to watch for changes to the list. Here is some sample code to obtain the list and watch for changes:

Watcher workersChangeWatcher = new Watcher() { 1
    public void process(WatchedEvent e) {
        if(e.getType() == EventType.NodeChildrenChanged) {
            assert "/workers".equals( e.getPath() );

            getWorkers();
        }
    }
};

void getWorkers() {
    zk.getChildren("/workers",
                    workersChangeWatcher,
                    workersGetChildrenCallback,
                    null);
}

ChildrenCallback workersGetChildrenCallback = new ChildrenCallback() {
    public void processResult(int rc, String path, Object ctx,
                              List<String> children) {
        switch (Code.get(rc)) {
        case CONNECTIONLOSS:
            getWorkers(); 2

            break;
        case OK:
            LOG.info("Succesfully got a list of workers: "
                     + children.size()
                     + " workers");
            reassignAndSet(children); 3

            break;
        default:
            LOG.error("getChildren failed",
                      KeeperException.create(Code.get(rc), path));
        }
    }
};
1

workersChangeWatcher is the watcher for the list of workers.

2

In the case of a CONNECTIONLOSS event, we need to reexecute the operation to obtain the children and set the watch.

3

This call reassigns tasks of dead workers and sets the new list of workers.

We start by calling getWorkers. This call executes getChildren asynchronously, passing workersGetChildrenCallback to process the result of the operation. If the client disconnects from a server (CONNECTIONLOSS event), the watch is not set and we don’t have a list of workers; we execute getWorkers again to set the watch and obtain the list of workers. Upon a successful execution of getChildren, we call reassignAndSet as follows:

ChildrenCache workersCache; 1

void reassignAndSet(List<String> children) {
    List<String> toProcess;

        if(workersCache == null) {
            workersCache = new ChildrenCache(children); 2
            toProcess = null; 3
        } else {
            LOG.info( "Removing and setting" );
            toProcess = workersCache.removedAndSet( children ); 4
        }

        if(toProcess != null) {
            for(String worker : toProcess) {
                getAbsentWorkerTasks(worker); 5
            }
        }
    }
1

Here’s the cache that holds the last set of workers we have seen.

2

If this is the first time it is using the cache, then instantiate it.

3

The first time we get workers, there is nothing to do.

4

If it is not the first time, then we need to check if some worker has been removed.

5

If there is any worker that has been removed, then we need to reassign its tasks.

We use the cache because we need to remember what we have seen before. Say that we get the list of workers for the first time. When we get the notification that the list of workers has changed, we won’t know what exactly has changed even after reading it again unless we keep the old values. The cache class for this example simply keeps the last list the master has read and implements a couple of methods to determine what has changed.

Watch upon CONNECTIONLOSS

A watch for a znode is set only if the operation is successful. If the ZooKeeper operation fails to be executed because the client has disconnected, then the application needs to call it again.

Master Waits for New Tasks to Assign

Like waiting for changes to the list of workers, the primary master waits for new tasks to be added to /tasks. The master initially obtains the set of current tasks and sets a watch for changes to the set. The set is represented in ZooKeeper by the children of /tasks, and each child corresponds to a task. Once the master obtains tasks that have not yet been assigned, it selects a worker at random and assigns the task to the worker. We implement the assignment in assignTasks:

Watcher tasksChangeWatcher = new Watcher() { 1
    public void process(WatchedEvent e) {
        if(e.getType() == EventType.NodeChildrenChanged) {
            assert "/tasks".equals( e.getPath() );

            getTasks();
        }
    }
};

void getTasks() {
    zk.getChildren("/tasks",
                   tasksChangeWatcher,
                   tasksGetChildrenCallback,
                   null); 2
}

ChildrenCallback tasksGetChildrenCallback = new ChildrenCallback() {
    public void processResult(int rc,
                              String path,
                              Object ctx,
                              List<String> children) {
        switch(Code.get(rc)) {
        case CONNECTIONLOSS:
            getTasks();

            break;
        case OK:
            if(children != null) {
                assignTasks(children); 3
            }

            break;
        default:
            LOG.error("getChildren failed.",
                      KeeperException.create(Code.get(rc), path));
        }
    }
};
1

Watcher implementation to handle a notification that the list of tasks has changed.

2

Get the list of tasks.

3

Assign tasks in the list.

Now we’ll implement assignTasks. It simply assigns each of the tasks in the list of children of /tasks. Before creating the assignment znode, we get the task data with getData:

void assignTasks(List<String> tasks) {
    for(String task : tasks) {
        getTaskData(task);
    }
}

void getTaskData(String task) {
    zk.getData("/tasks/" + task,
               false,
               taskDataCallback,
               task); 1
}

DataCallback taskDataCallback = new DataCallback() {
    public void processResult(int rc,
                              String path,
                              Object ctx,
                              byte[] data,
                              Stat stat)  {
        switch(Code.get(rc)) {
        case CONNECTIONLOSS:
            getTaskData((String) ctx);

            break;
        case OK:
            /*
             * Choose worker at random.
             */
            int worker = rand.nextInt(workerList.size());
            String designatedWorker = workerList.get(worker);

            /*
             * Assign task to randomly chosen worker.
             */
            String assignmentPath = "/assign/" + designatedWorker + "/" +
                                     (String) ctx;
            createAssignment(assignmentPath, data); 2

            break;
        default:
            LOG.error("Error when trying to get task data.",
                    KeeperException.create(Code.get(rc), path));
        }
    }
};
1

Get task data.

2

Select a worker randomly and assign the task to this worker.

We need to get the task data first because we delete the task znode under /tasks after assigning it. This way the master doesn’t have to remember which tasks it has assigned. Let’s look at the code for assigning a task:

void createAssignment(String path, byte[] data) {
    zk.create(path,
            data, Ids.OPEN_ACL_UNSAFE,
            CreateMode.PERSISTENT,
            assignTaskCallback,
            data); 1
}

StringCallback assignTaskCallback = new StringCallback() {
    public void processResult(int rc, String path, Object ctx, String name) {
        switch(Code.get(rc)) {
        case CONNECTIONLOSS:
            createAssignment(path, (byte[]) ctx);

            break;
        case OK:
            LOG.info("Task assigned correctly: " + name);
            deleteTask(name.substring( name.lastIndexOf("/") + 1 )); 2

            break;
        case NODEEXISTS:
            LOG.warn("Task already assigned");

            break;
        default:
            LOG.error("Error when trying to assign task.",
                      KeeperException.create(Code.get(rc), path));
        }
    }
};
1

Create an assignment. The path is of the form /assign/worker-id/task-num.

2

Delete the task znode under /tasks.

For new tasks, after the master selects a worker to assign the task to, it creates a znode under /assign/worker-id, where id is the identifier of the worker. Next, it deletes the znode from the list of pending tasks. The code for deleting the znode in the previous example follows the pattern of earlier code we have shown.

When the master creates an assignment znode for a worker with identifier id, ZooKeeper generates a notification for the worker, assuming that the worker has a watch registered upon its assignment znode (/assign/worker-id).

Note that the master also deletes the task znode under /tasks after assigning it successfully. This approach simplifies the role of the master when it receives new tasks to assign. If the list of tasks mixed the assigned and unassigned tasks, the master would need a way to disambiguate the tasks.

Worker Waits for New Task Assignments

One of the first steps a worker has to execute is to register itself with ZooKeeper. It does this by creating a znode under /workers, as we already discussed:

void register() {
        zk.create("/workers/worker-" + serverId,
                  new byte[0],
                  Ids.OPEN_ACL_UNSAFE,
                  CreateMode.EPHEMERAL,
                  createWorkerCallback, null); 1
}

StringCallback createWorkerCallback = new StringCallback() {
    public void processResult(int rc, String path, Object ctx, String name) {
        switch (Code.get(rc)) {
        case CONNECTIONLOSS:
            register(); 2

            break;
        case OK:
            LOG.info("Registered successfully: " + serverId);

            break;
        case NODEEXISTS:
            LOG.warn("Already registered: " + serverId);

            break;
        default:
            LOG.error("Something went wrong: " +
                      KeeperException.create(Code.get(rc), path));
        }
    }
};
1

Register the worker by creating a znode.

2

Try again. Note that registering again is not a problem. If the znode has already been created, we get a NODEEXISTS event back.

Adding this znode signals to the master that this worker is active and ready to process tasks. In order to simplify the example, we don’t use the idle/busy status (introduced in Chapter 3).

We similarly create a znode /assign/worker-id so that the master can assign tasks to this worker. If we create /workers/worker-id before /assign/worker-id, we could fall into the situation in which the master tries to assign the task but cannot because the assigned parent’s znode has not been created yet. To avoid this situation, we need to create /assign/worker-id first. Moreover, the worker needs to set a watch on /assign/worker-id to receive a notification when a new task is assigned.

Once the worker has tasks assigned to it, it fetches the tasks from /assign/worker-id and executes them. The worker takes each task in its list and verifies whether it has already queued the task for execution. It keeps a list of ongoing tasks for this purpose. Note that we loop through the assigned tasks of a worker in a separate thread to release the callback thread. Otherwise, we would be blocking other incoming callbacks. In our example, we use a Java ThreadPoolExecutor to allocate a thread to loop through the tasks:

Watcher newTaskWatcher = new Watcher() {
    public void process(WatchedEvent e) {
        if(e.getType() == EventType.NodeChildrenChanged) {
            assert new String("/assign/worker-"+ serverId).equals( e.getPath() );

            getTasks(); 1
        }
    }
};

void getTasks() {
    zk.getChildren("/assign/worker-" + serverId,
                   newTaskWatcher,
                   tasksGetChildrenCallback,
                   null);
}

ChildrenCallback tasksGetChildrenCallback = new ChildrenCallback() {
    public void processResult(int rc,
                              String path,
                              Object ctx,
                              List<String> children) {
        switch(Code.get(rc)) {
        case CONNECTIONLOSS:
            getTasks();
            break;
        case OK:
            if(children != null) {
                executor.execute(new Runnable() { 2
                    List<String> children;
                    DataCallback cb;

                    public Runnable init (List<String> children,
                                          DataCallback cb) {
                        this.children = children;
                        this.cb = cb;

                        return this;
                    }

                    public void run() {
                        LOG.info("Looping into tasks");
                        synchronized(onGoingTasks) {
                            for(String task : children) { 3
                                if(!onGoingTasks.contains( task )) {
                                    LOG.trace("New task: {}", task);
                                    zk.getData("/assign/worker-" +
                                               serverId + "/" + task,
                                               false,
                                               cb,
                                               task); 4
                                    onGoingTasks.add( task ); 5
                                }
                            }
                        }
                    }
                }
                .init(children, taskDataCallback));
            }
            break;
        default:
            System.out.println("getChildren failed: " +
                               KeeperException.create(Code.get(rc), path));
        }
    }
};
1

Upon receiving a notification that the children have changed, get the list of children.

2

Execute in a separate thread.

3

Loop through the list of children.

4

Get task data to execute it.

5

Add task to the list of tasks being executed to avoid executing it multiple times.

Session Events and Watchers

When we disconnect from a server (for example, when the server crashes), no watches are delivered until the connection is reestablished. For this reason, session events like CONNECTIONLOSS are sent to all outstanding watch handlers. In general, applications use session events to go into a safe mode: the ZooKeeper client does not receive events while disconnected, so it should act conservatively in this state. In the case of our toy master–worker application, all actions except submitting a task are reactive, so if a master or a worker is disconnected, it simply does not trigger any action. Also, the master–worker client is not able to submit new tasks and it does not receive status notifications while disconnected.

Client Waits for Task Execution Result

Suppose an application client has submitted a task. Now it needs to know when it has been executed and its status. Recall that once a worker executes a task, it creates a znode under /status. Let’s first check the code to submit a task for execution:

void submitTask(String task, TaskObject taskCtx) {
    taskCtx.setTask(task);
    zk.create("/tasks/task-",
              task.getBytes(),
              Ids.OPEN_ACL_UNSAFE,
              CreateMode.PERSISTENT_SEQUENTIAL,
              createTaskCallback,
              taskCtx); 1
}

StringCallback createTaskCallback = new StringCallback() {
    public void processResult(int rc, String path, Object ctx, String name) {
        switch (Code.get(rc)) {
        case CONNECTIONLOSS:
            submitTask(((TaskObject) ctx).getTask(),
                         (TaskObject) ctx); 2

            break;
        case OK:
            LOG.info("My created task name: " + name);
            ((TaskObject) ctx).setTaskName(name);
            watchStatus("/status/" + name.replace("/tasks/", ""),
                         ctx); 3

            break;
        default:
            LOG.error("Something went wrong" +
                       KeeperException.create(Code.get(rc), path));
        }
    }
};
1

The context object here is an instance of the Task class.

2

Resubmit the task upon a connection loss. Note that resubmitting may create a duplicate of the task.

3

Set a watch on the status znode for this task.

Has My Sequential Znode Been Created?

Dealing with a CONNECTIONLOSS event when trying to create a sequential znode is somewhat tricky. Because ZooKeeper assigns the sequence number, it is not possible for the disconnected client to determine whether the znode has been created when there might be concurrent requests to create sequential znodes from other clients. (Note that all the create requests discussed in this note refer to the children of the same znode.)

To overcome this limitation, we have to give some hint about the originator of the znode, like having the server ID as part of the task name. Using this approach, it is possible to determine whether the task has been created by listing all task znodes.

Here we check if the status node already exists (maybe the task has been processed fast) and set a watch. We provide a watcher implementation to react to the notification of the znode creation and a callback implementation for the exists call:

ConcurrentHashMap<String, Object> ctxMap =
    new ConcurrentHashMap<String, Object>();

void watchStatus(String path, Object ctx) {
    ctxMap.put(path, ctx);
    zk.exists(path,
              statusWatcher,
              existsCallback,
              ctx); 1
}

Watcher statusWatcher = new Watcher() {
    public void process(WatchedEvent e) {
        if(e.getType() == EventType.NodeCreated) {
            assert e.getPath().contains("/status/task-");

            zk.getData(e.getPath(),
                       false,
                       getDataCallback,
                       ctxMap.get(e.getPath()));
        }
    }
};

StatCallback existsCallback = new StatCallback() {
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        switch (Code.get(rc)) {
        case CONNECTIONLOSS:
            watchStatus(path, ctx);

            break;
        case OK:
            if(stat != null) {
                zk.getData(path, false, getDataCallback, null); 2
            }

            break;
        case NONODE:
            break; 3
        default:
            LOG.error("Something went wrong when " +
                	  "checking if the status node exists: " +
                      KeeperException.create(Code.get(rc), path));

            break;
        }
    }
};
1

The client propagates the context object here so that it can modify the task object (TaskObject) accordingly when it receives a notification for the status znode.

2

The status znode is already there, so the client needs to get it.

3

If the status znode is not there yet, which should typically be the case, the client does nothing.

An Alternative Way: Multiop

Multiop was not in the original design of ZooKeeper, but was added in version 3.4.0. Multiop enables the execution of multiple ZooKeeper operations in a block atomically. The execution is atomic in the sense that either all operations in a multiop block succeed or all fail. For example, we can delete a parent znode and its child in a multiop block. The only possible outcomes are that either both operations succeed or both fail. It is not possible for the parent to be deleted while leaving one of its children around, or vice versa.

To use the multiop feature:

  1. Create an Op object to represent each ZooKeeper operation you intend to execute through a multiop call. ZooKeeper provides an Op implementation for each of the operations that change state: create, delete, and setData.

  2. Within the Op object, call a static method provided by Op for that operation.

  3. Add this Op object to an Iterable Java object, such as a list.

  4. Call multi on the list.

The following example illustrates this process:

    Op deleteZnode(String z) { 1
        return Op.delete(z, -1); 2
    }

    ...

    List<OpResult> results = zk.multi(Arrays.asList(deleteZnode("/a/b"),
                                      deleteZnode("/a")); 3
1

Create an Op object for the delete call.

2

Return the object by calling the appropriate Op method.

3

Execute both delete calls as a unit by using the multi call and passing them as a list of Op instances.

The call to multi returns a list of OpResult objects, each one specialized to the corresponding operation. For example, for the delete operation we have a DeleteResult class, which extends OpResult. The methods and data exposed by each result object depend on the operation type. DeleteResult offers only equals and hashCode methods, whereas CreateResult exposes the path of the operation and a Stat object. In the presence of errors, ZooKeeper returns an instance of ErrorResult containing an error code.

The multi call also has an asynchronous version. Here are the signatures of the synchronous and asynchronous methods:

public List<OpResult> multi(Iterable<Op> ops) throws InterruptedException,
                                                     KeeperException;
public void multi(Iterable<Op> ops, MultiCallback cb, Object ctx);

Transaction is a wrapper for multi with a simpler interface. We can create an instance of Transaction, add operations, and commit the transaction. The previous example rewritten using Transaction looks like this:

Transaction t = new Transaction();
t.delete("/a/b", -1);
t.delete("/a", -1);
List<OpResult> results = t.commit();

The commit call also has an asynchronous version that takes as input a MultiCallback object and a context object:

public void commit(MultiCallback cb, Object ctx);

Multiop can simplify our master–worker implementation in at least one place. When assigning a task, the master in previous examples has created the corresponding assignment znode and then deleted the task znode under /tasks. If the master crashes before deleting the znode under /tasks, we are left with a task in /tasks that has already been assigned. Using multiop, we can create the znode representing the assignment of the task under /assign and delete the znode representing the task under /tasks atomically. Using this approach, we guarantee that no task znode under /tasks has been already assigned. If a backup takes over the role of master, it is not necessary to disambiguate the tasks in /tasks: they are all unassigned.

Another feature that multiop offers is the possibility of checking the version of a znode to enable operations over multiple znodes that read the state of ZooKeeper and write back some data—possibly a modification of what has been read. The version of the znode that is checked does not change, so this call enables a multiop that checks the version of a znode that is not modified. This feature is useful when the changes to one or more znodes are conditional upon the version of another znode. Say that in our master–worker example, the master needs to have the clients adding new tasks under a path that the master specifies. For example, the master could ask clients to create new tasks as children of /tasks-mid, where mid is the master identifier. The master stores this path as the data of the /master-path znode. A client that needs to add a new task first reads /master-path and picks its current version with Stat. Next, the client creates a new task znode under /tasks-mid as part of the a multiop call, and it also checks that the version of /master-path matches the one it has read.

The signature of check is similar to that of setData, but it doesn’t include data:

public static Op check(String path, int version);

If the version of the znode in the given path does not match, the multi call fails. To illustrate, this is roughly how the code would look if we were to implement the example we have just discussed:

byte[] masterData = zk.getData("/master-path", false, stat); 1
String parent = new String(masterData); 2
String path = parent + "/task-";
...

zk.multi(Arrays.asList(Op.check("/master-path", stat.getVersion()),
                       Op.create(path,
                                 task,
                                 Ids.OPEN_ACL_UNSAFE,
                                 CreateMode.PERSISTENT_SEQUENTIAL))) 3
1

Get the data of /master-path.

2

Extract the path from the /master-path znode.

3

multi call with two operations, one verifies the version of the master path and the other creates a task znode.

Note that if we store the path along with the master ID in /master, this scheme does not work. The /master znode is created every time by a new master, which makes its version consistently 1.

Watches as a Replacement for Explicit Cache Management

It is undesirable from the application’s perspective to have clients accessing ZooKeeper every time they need to get the data for a given znode, the list of children of a znode, or anything else related to the ZooKeeper state. Instead, it is much more efficient to have clients cache values locally and use them at will. Once such values change, of course, you want ZooKeeper to notify the clients so they can update the caches. These notifications are the same ones that we have been discussing to this point, and as before, application clients register to receive such notifications through watches. In short, these watches enable clients to cache a local version of a value (such as the data of a znode or its list of children) and to receive notifications when that value changes.

An alternative to the approach that the ZooKeeper designers have adopted would be to cache transparently on behalf of the client all ZooKeeper state it accesses and to invalidate the values transparently when there are updates to cached data. Implementing such a cache coherence scheme could be costly, however, because clients might not need to cache all ZooKeeper state they access, and servers would need to invalidate cached state nonetheless. To implement invalidation, servers would have to either keep track of the cache content for each client or broadcast invalidation requests. Both options are expensive for a large number of clients and undesirable from our perspective.

Regardless of which party manages the client cache—ZooKeeper directly or the ZooKeeper application—notifying clients of updates can be performed either synchronously or asynchronously. Synchronously invalidating state across all clients holding a copy would be inefficient, because clients often proceed at different paces and consequently slow clients would force other clients to wait. Such differences become more frequent as the size of the client population increases.

The notifications approach that the designers opted for can be perceived as an asynchronous way of invalidating ZooKeeper state on the client side. ZooKeeper queues notifications to clients, and such notifications are consumed asynchronously. This invalidation scheme is also optional; it is up to the application to decide what parts of the ZooKeeper state require invalidation for any given client. These design choices are a better match for the use cases of ZooKeeper.

Ordering Guarantees

There are a few important observations to keep in mind with respect to ordering when implementing applications with ZooKeeper.

Order of Writes

ZooKeeper state is replicated across all servers forming the ensemble of an installation. The servers agree upon the order of state changes and apply them using the same order. For example, if a ZooKeeper server applies a state change that creates a znode /z followed by a state change that deletes a znode /z', all servers in the ensemble must also apply these changes, and in the same order.

Servers, however, do not necessarily apply state updates simultaneously. In fact, they rarely do. Servers most likely apply state changes at different times because they proceed at different speeds, even if the hardware they run upon is fairly homogeneous. There are a number of reasons that could cause this time lag, such as operating system scheduling and background tasks.

Applying state updates at different times is typically not a problem for applications because they still perceive the same order of updates. Applications may perceive it, however, if ZooKeeper state is communicated through hidden channels, as we discuss next.

Order of Reads

ZooKeeper clients always observe the same order of updates, even if they are connected to different servers. But it is possible for two clients to observe updates at different times. If they communicate outside ZooKeeper, the difference becomes apparent.

Let’s consider the following situation:

  • A client c1 updates the data of a znode /z and receives an acknowledgment.

  • Client c1 sends a message through a direct TCP connection to a client c2 saying that it has changed the state of /z.

  • Client c2 reads the state of /z but observes a state previous to the update of c1.

We call this a hidden channel because ZooKeeper doesn’t know about the clients’ extra communication. Now c2 has stale data. This situation is illustrated in Figure 4-2.

zook 0402
Figure 4-2. Example of the hidden channel problem

To avoid reading stale data, we advise that applications use ZooKeeper for all communication related to the ZooKeeper state. For example, to avoid the situation just described, c2 could set a watch on /z instead of receiving a direct message from c1. With a watch, c2 learns of the change to /z and eliminates the hidden channel problem.

Order of Notifications

ZooKeeper orders notifications with respect to other notifications and asynchronous replies, respecting the order of updates to the system state. Say that ZooKeeper orders two state updates u and  , with   following u. Updates u and   modify znodes /a and /b, respectively. A client c that has a watch set on /a and reads the update   to /b receives the notification for u before it receives the response of the operation to read /b.

This ordering enables applications to use watches to implement safety properties. Say that a znode /z is created or deleted to indicate that some configuration stored in ZooKeeper is invalid. Guaranteeing that clients are notified of the creation or deletion of /z before any actual change is made to the configuration is important to make sure that clients won’t read an invalid configuration.

To make it more concrete, say that we have a znode /config that is the parent of a number of other znodes containing application configuration metadata: /config/m1, /config/m2, …, /config/mn. For the purposes of this example, it doesn’t matter what the content of the znodes actually is. Say that a master application process needs to update these nodes by invoking setData on each znode, and it can’t have a client reading a partial update to these znodes. One solution is to have the master create a /config/invalid znode before it starts updating the configuration znodes. Other clients that need to read this state watch /config/invalid and avoid reading it if the invalid znode is present. Once the invalid znode is deleted, meaning that a new valid set of configuration znodes is available, clients can proceed to read that set.

For this particular example, we could alternatively have used multiop to execute all setData operations to the /config/m[1-n] znodes atomically instead of using a znode to mark some state as partially modified. In instances in which atomicity is the problem, we can use multiop instead of relying upon an extra znode and notifications. The notification mechanism, however, is more general and is not constrained to atomicity.

Because ZooKeeper orders notifications according to the order of the state updates that trigger the notifications, clients can rely upon perceiving the true order of ZooKeeper state changes through their notifications.

Liveness versus Safety

We have used the notifications mechanism extensively for liveness in this chapter. Liveness is about making sure that the system eventually makes progress. Notifications of new tasks and new workers are examples of events related to liveness. If a master is not notified of a new task, the task will never be executed. Not executing a submitted task constitutes absence of liveness, at least from the perspective of the client that submitted the task.

This last example of atomic updates to a set of configuration znodes is different: it is about safety, not liveness. Reading the znodes while they are being updated might lead to a client reading an inconsistent configuration. The invalid znode makes sure that clients read the state only when a valid configuration is available.

For the liveness examples we have seen, the order of delivery of notifications is not particularly important. As long as clients eventually learn of those events, they will make progress. For safety, however, receiving a notification out of order might lead to incorrect behavior.

The Herd Effect and the Scalability of Watches

One issue to be aware of is that ZooKeeper triggers all watches set for a particular znode change when the change occurs. If there are 1,000 clients that have set a watch on a given znode with a call to exists, then 1,000 notifications will be sent out when the znode is created. A change to a watched znode might consequently generate a spike of notifications. Such a spike could affect, for example, the latency of operations submitted around the time of the spike. When possible, we recommend avoiding such a use of ZooKeeper in which a large number of clients watch for a change to a given znode. It is much better to have only a few clients watching any given znode at a time, and ideally at most one.

One way around this problem that doesn’t apply in every case but might be useful in some is the following. Say that n clients are competing to acquire a lock (e.g., a master lock). To acquire the lock, a process simply tries to create the /lock znode. If the znode exists, the client watches the znode for deletion. When the znode is deleted, the client tries again to create /lock. With this strategy, all clients watching /lock receive a notification when /lock is deleted. A different approach is to have each client create a sequential znode /lock/lock-. Recall that ZooKeeper adds a sequence number to the znode, automatically making it /lock/lock-xxx, where xxx is a sequence number. We can use the sequence number to determine which client acquires the lock by granting it to the client that created the znode under /lock with the smallest sequence number. In this scheme, a client determines if it has the smallest sequence number by getting the children of /lock with getChildren. If the client doesn’t have the smallest sequence number, it watches the next znode in the sequence determined by the sequence numbers. For example, say we have three znodes: /lock/lock-001, /lock/lock-002, and /lock/lock-003. In this example:

  • The client that created /lock/lock-001 has the lock.

  • The client that created /lock/lock-002 watches /lock/lock-001.

  • The client that created /lock/lock-003 watches /lock/lock-002.

This way each znode has at most one client watching it.

Another dimension to be aware of is the state generated with watches on the server side. Setting a watch creates a Watcher object on the server. According to the YourKit profiler, setting a watch adds around 250 to 300 bytes to the amount of memory consumed by the watch manager of a server. Having a very large number of watches implies that the watch manager consumes a nonnegligible amount of server memory. For example, having 1 million outstanding watches gives us a ballpark figure of 0.3 GB. Consequently, a developer must be mindful of the number of watches outstanding at any time.

Takeaway Messages

In a distributed system, there are many events that trigger actions. ZooKeeper provides efficient mechanisms for keeping track of important events that require processes in the system to react. Examples we have discussed here are related to the regular flow of applications (e.g., execution of tasks) or crash faults (e.g., master crashes).

One key ZooKeeper feature that we have used is notifications. ZooKeeper clients register watches with ZooKeeper to receive notifications upon changes to the ZooKeeper state. The order in which notifications are delivered is important; clients must not observe different orders for the changes to the ZooKeeper state.

One particular feature that is useful when dealing with changes is the multi call. It enables multiple operations to be executed in a block and often avoids race conditions in distributed applications when clients are reacting to events and changing the ZooKeeper state.

We expect most applications to follow the pattern we present here, although variants are of course possible and acceptable. We have focused on the asynchronous API because we encourage developers to use it. The asynchronous API enables applications to use ZooKeeper resources more efficiently and to obtain higher performance.

Get ZooKeeper now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.