The handleWorkerPayloads method is responsible for handling incoming payloads from a particular worker. The method blocks waiting for either a new incoming payload to appear or the job context to be canceled:
func (c *masterJobCoordinator) handleWorkerPayloads(workerIndex int, worker *remoteWorkerStream, graph *bspgraph.Graph) { var wPayload *proto.WorkerPayload for { select { case wPayload = <-worker.RecvFromWorkerChan(): case <-c.jobCtx.Done(): return } if relayMsg := wPayload.GetRelayMessage(); relayMsg != nil { c.relayMessageToWorker(workerIndex, relayMsg) } else if stepMsg := wPayload.GetStep(); stepMsg != nil { updatedStep, err := c.barrier.Wait(stepMsg) if err != nil { c.cancelJobCtx() return } c.sendToWorker(worker, ...