Chapter 4. Knative Eventing

In this chapter, we present recipes that will help you get started with Knative Eventing. We will start with a high-level overview of the usage patterns and then drill down into specific steps to connect the various components together into end-to-end working examples.

As previously described in Chapter 1, Knative has two major subprojects: Serving and Eventing. With Serving you have dynamic autoscaling, including scaling down to zero pods, based on the absence of HTTP traffic load. With Eventing, you now have that same autoscaling capability but bridged into other protocols or from other sources beyond HTTP. For example, a barrage of messages flowing through an Apache Kafka topic can cause autoscaling of your Kubernetes-based service to handle those messages. Or perhaps a scheduled event via cron can cause your service to awake from its slumber and perform its duties.

Usage Patterns

There are three primary usage patterns with Knative Eventing:

Source to Sink

Source to Sink provides the simplest getting started experience with Knative Eventing. It provides single Sink—that is, event receiving service—with no queuing, backpressure, and filtering. Source to Sink does not support replies, which means the response from the Sink Service is ignored. As shown in Figure 4-1, the responsibility of the Event Source is just to deliver the message without waiting for the response from the Sink; hence, it will be appropriate to compare Source to Sink to the fire and forget messaging pattern.

Source to Sink
Figure 4-1. Source to Sink
Channels and Subscriptions

With Channels and Subscriptions, the Knative Eventing system defines a Channel, which can connect to various backends such as In-Memory, Kafka, and GCP PubSub for sourcing the events. Each Channel can have one or more Subscribers in the form of Sink Services as shown in Figure 4-2, which can receive the event messages and process them as needed. Each message from the Channel is formatted as a CloudEvent and sent further up in the chain to other Subscribers for further processing. The Channels and Subscriptions usage pattern does not have the ability to filter messages.

Channels and Subscriptions
Figure 4-2. Channels and Subscriptions
Brokers and Triggers

Brokers and Triggers are similar to Channels and Subscriptions, except that they support filtering of events. Event filtering is a method that allows the Subscribers to show an interest in a certain set of messages that flows into the Broker. For each Broker, Knative Eventing will implicitly create a Knative Eventing Channel. As shown in Figure 4-3, the Trigger gets itself subscribed to the Broker and applies the filter on the messages on its subscribed Broker. The filters are applied on the CloudEvent attributes of the messages, before delivering the message to the interested Sink Services (Subscribers).

Brokers and Triggers
Figure 4-3. Brokers and Triggers

Before You Begin

All the recipes in this chapter will be executed from the directory $BOOK_HOME/eventing, so change to the recipe directory by running:

$ cd $BOOK_HOME/eventing

The recipes in this chapter will be deployed in the chapter-4 namespace, so switch to the chapter-4 namespace with the following command:

$ kubectl config set-context --current --namespace=chapter-4

The recipes in this chapter will enable us to do eventing with Knative and will help us in understanding how Knative Serving Services can respond to external events via Knative Eventing.

4.1 Producing Events with Eventing Sources

Problem

You need a way to connect to and receive events into your application.

Solution

Knative Eventing Sources are software components that emit events. The job of a Source is to connect to, drain, capture, and potentially buffer events, often from an external system, and then relay those events to the Sink.

Knative Eventing Sources install the following four sources out-of-the-box:

$ kubectl api-resources --api-group=sources.eventing.knative.dev
NAME              APIGROUP                      NAMESPACED   KIND
apiserversources  sources.eventing.knative.dev  true         ApiServerSource
containersources  sources.eventing.knative.dev  true         ContainerSource
cronjobsources    sources.eventing.knative.dev  true         CronJobSource
sinkbindings      sources.eventing.knative.dev  true         SinkBinding

Discussion

The ApiServerSource allows you to listen in on Kubernetes API events, like those events provided by kubectl get events.

The ContainerSource allows you to create your own container that emits events which can be targeted at Sinks—your specific Service.

The CronJobSource allows you to specify a cron timer, a recurring task that will emit an event to your Sink on a periodic basis. The CronJobSource is often the easiest way to verify that Knative Eventing is working properly.

SinkBindings allows you to link any addressable Kubernetes resource to receive events from any other Kubernetes resource that may produce events.

There are many other Source types, and you can review the current list of Sources within the Knative documentation.

Before we take deep dive into the recipes in this chapter, let’s quickly understand the structure of a Knative Event Source resource YAML:

apiVersion: sources.eventing.knative.dev/v1alpha1
kind: CronJobSource 1
metadata:
  name: eventinghello-cronjob-source
spec: 2
  schedule: "*/2 * * * *"
  data: '{"key": "every 2 mins"}'
  sink: 3
    ref:
      apiVersion: serving.knative.dev/v1alpha1
      kind: Service
      name: eventinghello
1

Knative Sources are described as CRDs; therefore, you construct an artifact with the correct kind

2

spec will be unique per Source, per kind

3

sink will be described next

4.2 Receiving Events with Knative Eventing Sinks

Problem

You need to connect your custom service to the events from an Event Source.

Solution

Knative Eventing Sink is how you specify the event receiver—that is, the consumer of the event. Sinks can be invoked directly in a point-to-point fashion by referencing them via the Event Source’s sink as shown here:

apiVersion: sources.eventing.knative.dev/v1alpha1
kind: CronJobSource
metadata:
  name: eventinghello-cronjob-source
spec:
  schedule: "*/2 * * * *"
  data: '{"key": "every 2 mins"}'
  sink: 1
    ref:
      apiVersion: serving.knative.dev/v1alpha1 2
      kind: Service
      name: eventinghello 3
1

sink can target any Kubernetes Service or

2

a Knative Serving Service

3

deployed as “eventinghello”

Discussion

The Sinks can target one of your Services—your code that will receive an HTTP POST with a CloudEvent payload. However, the Sink is also very flexible; it might point to a Channel (see Recipe 4.9 or a Broker (see Recipe 4.10), allowing for a publish-subscribe messaging pattern with one or more potential receivers. The Sink is often your Knative or Kubernetes Service that wishes to react to a particular event.

4.3 Deploying a Knative Eventing Service

Problem

Your Knative or Kubernetes Service needs to receive input from Knative Eventing in a generic fashion, as events may come from many potential sources.

Solution

Your code will handle an HTTP POST as shown in the following listing, where the CloudEvent data is available as HTTP headers as well as in the body of the request:

  @PostMapping("/")
  public ResponseEntity<String> myPost (
    HttpEntity<String> http) {

    System.out.println("ce-id=" + http.getHeaders().get("ce-id"));
    System.out.println("ce-source=" + http.getHeaders().get("ce-source"));
    System.out.println("ce-specversion=" + http.getHeaders()
                                               .get("ce-specversion"));
    System.out.println("ce-time=" + http.getHeaders().get("ce-time"));
    System.out.println("ce-type=" + http.getHeaders().get("ce-type"));
    System.out.println("content-type=" + http.getHeaders().getContentType());
    System.out.println("content-length=" + http.getHeaders().getContentLength());

    System.out.println("POST:" + http.getBody());
  }

The CloudEvent SDK provides a class library and framework integration for various language runtimes such as Go, Java, and Python.

Additional details on the CloudEvent to HTTP mapping can be found in the CloudEvent GitHub repository.

The following listing shows a simple Knative Service (Sink):

apiVersion: serving.knative.dev/v1alpha1
kind: Service
metadata:
  name: eventinghello
spec:
  template:
    metadata:
      name: eventinghello-v1
      annotations:
        autoscaling.knative.dev/target: "1"1
    spec:
      containers:
      - image: quay.io/rhdevelopers/eventinghello:0.0.1
1

A concurrency of 1 HTTP request (an event) is consumed at a time. Most applications/services can easily handle many events concurrently and Knative’s out-of-the-box default is 100. For the purposes of experimentation, it is interesting to see the behavior when you use 1 as the autoscaling target.

Discussion

You can deploy and verify that the eventinghello Sink Service has been deployed successfully by looking for READY marked as True:

$ kubectl -n chapter-4 apply -f eventing-hello-sink.yaml
service.serving.knative.dev/eventinghello created
$ kubectl get ksvc
NAME           URL                                          READY
eventinghello  http://eventinghello.myeventing.example.com  True

The default behavior of Knative Serving is that the very first deployment of a Knative Serving Service will automatically scale up to one pod, and after about 90 seconds it will autoscale down to zero pods.

You can actively watch the pod lifecycle with the following command:

$ watch kubectl get pods

You can monitor the logs of the eventinghello pod with:

$ stern eventinghello -c user-container

Wait until eventinghello scales to zero pods before moving on.

4.4 Connecting a Source to the Service

Problem

You have a Knative Serving Service (Sink) and need to connect it to a Knative Eventing Source to test its autoscaling behavior.

Solution

Deploy a CronJobSource, as it is the easiest solution to verify if Knative Eventing is responding to events correctly. To deploy a CronJobSource, run the following command:

$ kubectl -n chapter-4 apply -f eventinghello-source.yaml
cronjobsource.sources.eventing.knative.dev/eventinghello-cronjob-source created
$ kubectl -n chapter-4 get cronjobsource
NAME                           READY   AGE
eventinghello-cronjob-source   True    10s

Discussion

The deployment of a CronJobSource also produces a pod with a prefix of “cronjobsource-eventinghell” as shown:

$ watch kubectl get pods
NAME                                            READY   STATUS    AGE
cronjobsource-eventinghello-54b9ef12-2c2f-11ea   1/1     Running   14s

Based on our cron expression, after two minutes it will kick off an event that will cause the eventinghello pod to scale up as shown the following listing:

$ watch kubectl get pods
NAME                                          READY STATUS   AGE
cronjobsource-eventinghell-54b9ef12-2c2f-11ea 1/1   Running  97s
eventinghello-v1-deployment-7cfcb664ff-r694p  2/2   Running  10s

After approximately 60 seconds, the eventinghello will autoscale down to zero pods, as it is a Knative Serving Service that will only be available while it is actively receiving events:

$ watch kubectl get pods
NAME                                          READY STATUS      AGE
cronjobsource-eventinghell-54b9ef12-2c2f-11ea 1/1   Running     2m28s
eventinghello-v1-deployment-7cfcb664ff-r694p  2/2   Terminating 65s

You can follow logs to see the CloudEvent details by using stern:

$ stern eventinghello -c user-container
ce-id=a1e0cbea-8f66-4fa6-8f3c-e5590c4ee147
ce-source=/apis/v1/namespaces/chapter-5/cronjobsources/
eventinghello-cronjob-source
ce-specversion=1.0
ce-time=2020-01-01T00:44:00.000889221Z
ce-type=dev.knative.cronjob.event
content-type=application/json
content-length=22
POST:{"key":"every 2 mins"}

Finally, when you are done with experimentation, simply delete the source and service:

$ kubectl -n chapter-4 delete -f eventinghello-source.yaml
cronjobsource.sources.eventing.knative.dev "eventinghello-cronjob-source" deleted
$ kubectl -n chapter-4 delete -f eventing-hello-sink.yaml
service.serving.knative.dev "eventinghello" deleted
$ kubectl get pods -n chapter-4
No resources found.

4.5 Deploying an Apache Kafka Cluster

Problem

You need to deploy an Apache Kafka cluster.

Solution

One of the easiest ways to deploy an Apache Kafka cluster is to use Strimzi, an operator and set of CRDs that deploys Apache Kafka inside of a Kubernetes cluster.

Discussion

As part of the upcoming recipes in this chapter, we will be deploying a Knative Source (see Recipe 4.6) that will respond to Apache Kafka Topic messages (events). Before getting to those recipes, we need to first deploy Apache Kafka inside your Kubernetes cluster. The strimzi Kubernetes operator can be used to deploy the Apache Kafka cluster in your Kubernetes cluster.

Run the following command to create the kafka namespace and deploy Apache Kafka into it:

$ kubectl create namespace kafka
$ curl -L \
https://github.com/strimzi/strimzi-kafka-operator\
/releases/download/0.16.2/strimzi-cluster-operator-0.16.2.yaml \
  | sed 's/namespace: .*/namespace: kafka/' \
  | kubectl apply -f - -n kafka

Wait for the strimzi-cluster-operator to be running:

$ watch kubectl get pods -n kafka
NAME                                        READY STATUS    AGE
strimzi-cluster-operator-85f596bfc7-7dgds   1/1   Running   1m2s

The strimzi operator would have installed several Apache Kafka–related CRDs, which can be used to create Apache Kafka core resources such as a topic, users, connectors, etc. You can verify the CRDs that are available by querying api-resources:

$ kubectl api-resources --api-group=kafka.strimzi.io
kafkabridges.kafka.strimzi.io                        2019-12-28T14:53:14Z
kafkaconnects.kafka.strimzi.io                       2019-12-28T14:53:14Z
kafkaconnects2is.kafka.strimzi.io                    2019-12-28T14:53:14Z
kafkamirrormakers.kafka.strimzi.io                   2019-12-28T14:53:14Z
kafkas.kafka.strimzi.io                              2019-12-28T14:53:14Z
kafkatopics.kafka.strimzi.io                         2019-12-28T14:53:14Z
kafkausers.kafka.strimzi.io                          2019-12-28T14:53:14Z

Now with the Apache Kafka operator running, you can deploy and verify a single-node Apache Kakfa cluster by running the command:

$ kubectl -n kafka apply -f kafka-broker-my-cluster.yaml
kafka.kafka.strimzi.io/my-cluster created
$ watch kubectl get pods -n kafka
NAME                                         READY  STATUS   AGE
my-cluster-entity-operator-7d677bdf7b-jpws7  3/3    Running  85s
my-cluster-kafka-0                           2/2    Running  110s
my-cluster-zookeeper-0                       2/2    Running  2m22s
strimzi-cluster-operator-85f596bfc7-7dgds    1/1    Running  4m22s

The Kubernetes CRD resource $BOOK_HOME/eventing/kafka-broker-my-cluster.yaml will deploy a single Zookeeper, Kafka Broker, and an Entity-Operator. The Entity-Operator is responsible for managing different custom resources such as KafkaTopic and KafkaUser.

Now that you have an Apache Kafka cluster deployed, you can create a Kafka Topic using the KafkaTopic CRD. The following listing shows how to create a Kafka Topic named my-topic:

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaTopic
metadata:
  name: my-topic
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 10 1
  replicas: 1
1

partitions: n allows for more concurrent scale-out of Sink pods. In theory, up to 10 pods will scale-up if there are enough messages flowing through the Kafka Topic.

Note

You can choose to skip the manual pre-creation of a KafkaTopic but the automatically generated topics will have partitions set to 1 by default.

Create and verify the topic:

$ kubectl -n kafka create -f kafka-topic-my-topic.yaml
kafkatopic.kafka.strimzi.io/my-topic created
$ kubectl -n kafka  get kafkatopics
NAME       PARTITIONS   REPLICATION FACTOR
my-topic   10           1

Verify that your Kafka Topic is working correctly by connecting a simple producer and consumer and creating some test messages. The sample code repository includes a script for producing Kafka messages called kafka-producer.sh. Execute the script and type in one, two, three, hitting Enter/Return after each string:

Producer

$ $BOOK_HOME/bin/kafka-producer.sh
>one
>two
>three

Consumer

You should also leverage the sample code repository’s kafka-consumer.sh script to see the message flow through the topic. Open a new terminal and run:

$ $BOOK_HOME/bin/kafka-consumer.sh
one
two
three

You can use Ctrl-C to stop producer and consumer interaction and their associated pods.

4.6 Sourcing Apache Kafka Events with Knative Eventing

Problem

You wish to connect to an Apache Kafka cluster and have those messages flow through Knative Eventing.

Solution

Use the Knative Eventing KafkaSource to have the Kafka messages flow through the Knative Eventing Channels. You can deploy the Knative KafkaSource by running the command:

$ kubectl apply \
-f https://github.com/knative/eventing-contrib/\
releases/download/v0.12.2/kafka-source.yaml

The previous step deploys the Knative KafkaSource in the knative-sources namespace as well as a CRD, ServiceAccount, ClusterRole, etc. Verify that the KnativeSource namespace includes the kafka-controller-manager-0 pod:

$ watch kubectl get pods -n knative-sources
NAME                         READY   STATUS    AGE
kafka-controller-manager-0   1/1     Running   1m17s

You should also deploy the Knative Kafka Channel that can be used to connect the Knative Eventing Channel with an Apache Kafka cluster backend. To deploy a Knative Kafka Channel, run:

$ curl -L "https://github.com/knative/eventing-contrib/\
releases/download/v0.12.2/kafka-channel.yaml" \
 | sed 's/REPLACE_WITH_CLUSTER_URL/my-cluster-kafka-bootstrap.kafka:9092/' \
 | kubectl apply --filename -
Note

“my-cluster-kafka-bootstrap.kafka:9092” comes from kubectl get services -n kafka.

Discussion

Look for three new pods in the knative-eventing namespace with the prefix “kafka”:

$ watch kubectl get pods -n knative-eventing
NAME                                   READY   STATUS    AGE
eventing-controller-666b79d867-kq8cc   1/1     Running   64m
eventing-webhook-5867c98d9b-hzctw      1/1     Running   64m
imc-controller-7c4f9945d7-s59xd        1/1     Running   64m
imc-dispatcher-7b55b86649-nsjm2        1/1     Running   64m
kafka-ch-controller-7c596b6b55-fzxcx   1/1     Running   33s
kafka-ch-dispatcher-577958f994-4f2qs   1/1     Running   33s
kafka-webhook-74bbd99f5c-c84ls         1/1     Running   33s
sources-controller-694f8df9c4-pss2w    1/1     Running   64m

And you should also find some new api-resources as shown here:

$ kubectl api-resources --api-group=sources.eventing.knative.dev
NAME               APIGROUP                       NAMESPACED  KIND
apiserversources   sources.eventing.knative.dev   true        ApiServerSource
containersources   sources.eventing.knative.dev   true        ContainerSource
cronjobsources     sources.eventing.knative.dev   true        CronJobSource
kafkasources       sources.eventing.knative.dev   true        KafkaSource
sinkbindings       sources.eventing.knative.dev   true        SinkBinding

$kubectl api-resources --api-group=messaging.knative.dev
NAME              SHORTNAMES  APIGROUP                NAMESPACED  KIND
channels          ch          messaging.knative.dev   true        Channel
inmemorychannels  imc         messaging.knative.dev   true        InMemoryChannel
kafkachannels     kc          messaging.knative.dev   true        KafkaChannel
parallels                     messaging.knative.dev   true        Parallel
sequences                     messaging.knative.dev   true        Sequence
subscriptions     sub         messaging.knative.dev   true        Subscription

Now that all of your infrastructure is configured, you can deploy the Knative Serving Service (Sink) by running the command:

$ kubectl apply -n chapter-5 -f eventing-hello-sink.yaml
service.serving.knative.dev/eventinghello created
$ kubectl get ksvc
NAME            URL                                      READY
eventinghello   http://eventinghello.kafka.example.com   True

Make sure to follow the logs using stern:

$ stern eventinghello -c user-container

The initial deployment of eventinghello will cause it to scale up to one pod. It will be around until it hits its scale-down time limit. Allow it to scale down to zero pods before continuing.

Create a KafkaSource for my-topic by connecting your Kafka Topic my-topic to eventinghello:

apiVersion: sources.eventing.knative.dev/v1alpha1
kind: KafkaSource
metadata:
  name: mykafka-source
spec:
  consumerGroup: knative-group
  bootstrapServers: my-cluster-kafka-bootstrap.kafka:9092 1
  topics: my-topic
  sink: 2
    ref:
      apiVersion: serving.knative.dev/v1alpha1
      kind: Service
      name: eventinghello
1

“my-cluster-kafka-bootstrap.kafka:9092” can be found via kubectl get -n kafka services

2

This is another example of a direct Source to Service

The deployment of KafkaSource will result in a new pod prefixed with “mykafka-source”:

$ kubectl -n chapter-4 apply -f mykafka-source.yaml
kafkasource.sources.eventing.knative.dev/mykafka-source created
$ watch kubectl get pods
NAME                                          READY  STATUS   RESTARTS  AGE
mykafka-source-vxs2k-56548756cc-j7m7v         1/1    Running  0         11s
Important

Since we had test messages of “one,” “two,” and “three” from earlier, you might see the eventinghello service awaken to process those messages.

Wait for eventinghello to scale down to zero pods before moving on, and then push more Kafka messages into my-topic.

Let’s now start an Apache Kafka producer that will send a message to my-topic:

$ $BOOK_HOME/bin/kafka-producer.sh

And then enter the following JSON-formatted messages:

{"hello":"world"}

{"hola":"mundo"}

{"bonjour":"le monde"}

{"hey": "duniya"}
Note

Knative Eventing events through the KafkaSource must be JSON-formatted.

While making sure to monitor the logs of the eventinghello pod:

$ stern eventinghello -c user-container
ce-id=partition:1/offset:1
ce-source=/apis/v1/namespaces/kafka/kafkasources/mykafka-source#my-topic
ce-specversion=1.0
ce-time=2020-01-01T01:16:12.886Z
ce-type=dev.knative.kafka.event
content-type=application/json
content-length=17
POST:{"hey": "duniya"}
Note

The sample output has been modified for readability and formatting reasons. You can see the logging output of all your JSON-based event input in the terminal where you are watching the eventinghello logs.

4.7 Autoscaling with Apache Kafka and Knative Eventing

Problem

Now that you have a connected a Kafka Topic to Knative Eventing, you wish to see it scale out to greater than a single pod.

Solution

You simply need to set the autoscaling target low enough by adding the annotation autoscaling.knative.dev/target: "1", while simultaneously pushing enough messages through the topic. You have already set the target to be 1 when deploying the eventinghello sink as shown in the following listing:

apiVersion: serving.knative.dev/v1alpha1
kind: Service
metadata:
  name: eventinghello
spec:
  template:
    metadata:
      name: eventinghello-v1
      annotations:
        autoscaling.knative.dev/target: "1" 1
    spec:
      containers:
      - image: quay.io/rhdevelopers/eventinghello:0.0.1
1

The Knative Serving Sink Service was defined with the autoscaling annotation that limits concurrency to approximately one pod per event (Kafka message)

Discussion

With a concurrency factor of 1, if you are able to push in a number of Kafka message rapidly, you will see more than one eventinghello pod scaled up to handle the load.

You simply need an application that allows you to push in messages rapidly. Launch the Kafka Spammer application and push in at least three messages, then run the following command to start the kafka-spammer pod:

$ kubectl -n kafka run kafka-spammer \
  --image=quay.io/rhdevelopers/kafkaspammer:1.0.2

You then exec into the running kafka-spammer pod by running the following command:

$ KAFKA_SPAMMER_POD=$(kubectl -n kafka get pod -l "run=kafka-spammer" \
 -o jsonpath={.items[0].metadata.name})
$ kubectl -n kafka exec -it $KAFKA_SPAMMER_POD -- /bin/sh

Use curl to send in three messages:

$ curl localhost:8080/3

You should see about three eventinghello pods coming to life, as shown in the following listing:

$ watch kubectl get pods
NAME                                          READY STATUS   AGE
eventinghello-v1-deployment-65c9b9c7df-8rwqc  1/2   Running  6s
eventinghello-v1-deployment-65c9b9c7df-q7pcf  1/2   Running  4s
eventinghello-v1-deployment-65c9b9c7df-zht2t  1/2   Running  6s
kafka-spammer-77ccd4f9c6-sx5j4                1/1   Running  26s
my-cluster-entity-operator-7d677bdf7b-jpws7   3/3   Running  27m
my-cluster-kafka-0                            2/2   Running  27m
my-cluster-zookeeper-0                        2/2   Running  28m
mykafka-source-vxs2k-56548756cc-j7m7v         1/1   Running  5m12s
strimzi-cluster-operator-85f596bfc7-7dgds     1/1   Running  30m
Note

The events are not being evenly distributed across the various eventinghello pods; the first pod up starts consuming them all immediately.

To close out the spammer, use exit and then delete its deployment:

$ kubectl delete -n kafka deployment kafka-spammer

4.8 Using a Kafka Channel as the Default Knative Channel

Problem

You want to use Apache Kafka as the default Channel backend for Knative Eventing.

Solution

Persistence and Durability are two very important features of any messaging-based architecture. The Knative Channel has built-in support for durability. Durability of messages becomes ineffective if the Knative Eventing Channel does not support persistence. Without persistence, it will not be able to deliver the messages to Subscribers that might be offline at the time of message delivery.

By default all Knative Channels created by the Knative Eventing API use InMemoryChannel (IMC), which does not have the capability to persist messages. To enable persistence we need to use one of the supported Channels such as GCP PubSub, Kafka, or Neural Autonomic Transport System (NATS) as the default Knative Channel backend.

We installed Apache Kafka earlier in Recipe 4.6. Let’s now configure it to be the default Knative Channel backend:

apiVersion: v1
kind: ConfigMap
metadata:
  name: default-ch-webhook
  namespace: knative-eventing
data:
  default-ch-config: |
    clusterDefault: 1
      apiVersion: messaging.knative.dev/v1alpha1
      kind: InMemoryChannel
    namespaceDefaults: 2
      chapter-4:
        apiVersion: messaging.knative.dev/v1alpha1
        kind: KafkaChannel
        spec:
          numPartitions: 1
          replicationFactor: 1
1

For the cluster we will still use the default InMemoryChannel

2

For the namespace chapter-4, all Knative Eventing Channels will use KafkaChannel as the default

Run the following command to apply the Knative Eventing Channel configuration:

$ kubectl apply -f default-kafka-channel.yaml

Discussion

Since you have now made all Knative Eventing Channels of chapter-4 to be KafkaChannel, creating a Knative Eventing Channel in the chapter-4 namespace will result in a corresponding Kafka Topic being created. Let’s now verify it by creating a sample Channel as shown in the following listing:

cat <<EOF | kubectl apply -f -
apiVersion: messaging.knative.dev/v1alpha1
kind: Channel
metadata:
  name: my-events-ch
  namespace: chapter-4
spec: {}
EOF

When you now list the topics that are available in Kafka using the script $BOOK_HOME/bin/kafka-list-topics.sh, you should see a topic corresponding to your Channel my-events-ch:

$ $BOOK_HOME/bin/kafka-list-topics.sh
knative-messaging-kafka.chapter-4.my-events-ch

For each Knative Eventing Channel that you create, a Kafka Topic will be created. The topic’s name will follow a convention like knative-messaging-kafka.<your-channel-namespace>.<your-channel-name>.

4.9 Using Knative Channels and Subscriptions

Problem

You would like to have multiple Sinks with potentially many services responding to events.

Solution

Use Knative Eventing Channels and Subscriptions:

Channels

Channels are an event-forwarding and persistence layer where each Channel is a separate Kubernetes Custom Resource. A Channel may be backed by Apache Kafka or InMemoryChannel.

Subscriptions

Subscriptions are how you register your service to listen to a particular channel.

The use of Channels and Subscriptions allows you to decouple the producers and consumers of events.

The recipe is as follows:

  1. Create a Channel

  2. Create a Source to Sink to the Channel

  3. Create at least two Sink Services

  4. Create Subscriptions to register your Sink Services with the Channel

Create a Channel:

apiVersion: messaging.knative.dev/v1alpha1
kind: Channel
metadata:
  name: eventinghello-ch

Verify that your Channel was created successfully:

$ kubectl -n chapter-4 create -f eventinghello-channel.yaml
channel.messaging.knative.dev/eventinghello-ch created
$ kubectl get ch
NAME             READY
eventinghello-ch True
URL
http://eventinghello-ch-kn-channel.chapter-5.svc.cluster.local

Then you need to create a Source that will send events to the Channel:

apiVersion: sources.eventing.knative.dev/v1alpha1
kind: CronJobSource
metadata:
  name: my-cjs
spec:
  schedule: "*/2 * * * *"
  data: '{"message": "From CronJob Source"}'
  sink:
   ref:
    apiVersion: messaging.knative.dev/v1alpha1 1
    kind: Channel 2
    name: eventinghello-ch
1

The Channel API is in the api-group messaging.eventing.knative.dev

2

kind is a Channel instead of direct to a specific service; the default is an InMemoryChannel implementation

Deploy and verify that your CronJobSource is running:

$ kubectl -n chapter-4 create -f eventinghello-source-ch.yaml
cronjobsource.sources.eventing.knative.dev/my-cjs created
$ kubectl -n chapter-4 get cronjobsource
NAME     READY   AGE
my-cjs   True    8s

Now you create the Sink services that will become the Subscribers:

apiVersion: serving.knative.dev/v1alpha1
kind: Service
metadata:
  name: eventinghelloa
spec:
  template:
    metadata:
      name: eventinghelloa-v1 1
      annotations:
        autoscaling.knative.dev/target: "1"
    spec:
      containers:
      - image: quay.io/rhdevelopers/eventinghello:0.0.1
1

The string eventinghelloa will help you identify this particular service:

apiVersion: serving.knative.dev/v1alpha1
kind: Service
metadata:
  name: eventinghellob
spec:
  template:
    metadata:
      name: eventinghellob-v1 1
      annotations:
        autoscaling.knative.dev/target: "1"
    spec:
      containers:
      - image: quay.io/rhdevelopers/eventinghello:0.0.1
1

The string eventinghellob will help you identify this particular service:

$ kubectl -n chapter-4 create -f eventing-helloa-sink.yaml
service.serving.knative.dev/eventinghelloa created
$ kubectl -n chapter-4 create -f eventing-hellob-sink.yaml
service.serving.knative.dev/eventinghellob created

Now create the appropriate Subscription for eventinghelloa to the Channel eventinghello-ch:

apiVersion: messaging.knative.dev/v1alpha1
kind: Subscription
metadata:
  name: eventinghelloa-sub
spec:
  channel:
    apiVersion: messaging.knative.dev/v1alpha1
    kind: Channel
    name: eventinghello-ch
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1alpha1
      kind: Service
      name: eventinghelloa

And create the appropriate Subscription for eventinghellob to the Channel eventinghello-ch:

apiVersion: messaging.knative.dev/v1alpha1
kind: Subscription
metadata:
  name: eventinghellob-sub
spec:
  channel:
    apiVersion: messaging.knative.dev/v1alpha1
    kind: Channel
    name: eventinghello-ch
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1alpha1
      kind: Service
      name: eventinghellob
$ kubectl -n chapter-4 create -f eventing-helloa-sub.yaml
subscription.messaging.knative.dev/eventinghelloa-sub created
$ kubectl -n chapter-4 create -f eventing-hellob-sub.yaml
subscription.messaging.knative.dev/eventinghellob-sub created

Discussion

If you wait approximately two minutes for the CronJobSource, you will see both eventinghelloa and eventinghellob begin to run:

$ watch kubectl get pods
NAME                                                      READY STATUS  AGE
cronjobsource-my-cjs-93544f14-2bf9-11ea-83c7-08002737670c 1/1   Running 2m15s
eventinghelloa-1-deployment-d86bf4847-hvbk6               2/2   Running 5s
eventinghellob-1-deployment-5c986c7586-4clpb              2/2   Running 5s

Once you are done with your experimentation, you can delete the event source my-cjs and eventinghelloa and eventinghellob:

$ kubectl -n chapter-4 delete -f eventing-helloa-sink.yaml
$ kubectl -n chapter-4 delete -f eventing-helloa-sub.yaml
$ kubectl -n chapter-4 delete -f eventing-hellob-sink.yaml
$ kubectl -n chapter-4 delete -f eventing-hellob-sub.yaml
$ kubectl -n chapter-4 delete -f eventinghello-source-ch.yaml

4.10 Using Knative Eventing Brokers and Triggers

Problem

You need event filtering because, by default, all the events flowing through a Channel will be sent to all Subscribers. In some cases, the Subscriber wishes to receive only a set of messages based on certain criteria.

Solution

Use the Knative Eventing Broker and Trigger Custom Resources to allow for CloudEvent attribute filtering.

The recipe is as follows:

  1. Inject the default Broker

  2. Create at least two Sink Services

  3. Create Triggers to register your Sink Services with the Channel

  4. Push some events

Labeling the chapter-4 namespace with knative-eventing-injection=enabled as shown in the following code will make Knative Eventing deploy a default Knative Eventing Broker and its related ingress:

$ kubectl label namespace chapter-4 knative-eventing-injection=enabled

Verify that the default Broker is running:

$ kubectl --namespace chapter-4 get broker
NAME     READY REASON URL                                                 AGE
default  True         http://default-broker.chapter-4.svc.cluster.local   22s

This will also start two additional pods named default-broker-filter and default-broker-ingress:

$ watch kubectl get pods
NAME                                         READY STATUS      AGE
default-broker-filter-c6654bccf-qb272        1/1   Running     18s
default-broker-ingress-7479966dc7-99xvm      1/1   Running     18s

Now that you have the Broker configured, you need to create the Sinks eventing​aloha and eventingbonjour, which will receive the filtered events.

Run the following command to deploy and verify the Knative Serving Services eventingaloha and eventingbonjour:

$ kubectl -n chapter-4 create -f eventing-aloha-sink.yaml
service.serving.knative.dev/eventingaloha created
$ kubectl -n chapter-4 create -f eventing-bonjour-sink.yaml
service.serving.knative.dev/eventingbonjour created
$ kubectl get ksvc
NAME              URL                                             READY
eventingaloha     http://eventingaloha.myeventing.example.com     True
eventingbonjour   http://eventingbonjour.myeventing.example.com   True
Note

The image being used by both of these services is identical. However, the difference between the names aloha and bonjour will make obvious which one is receiving a particular event.

$ watch kubectl get pods
NAME                                                    READY   STATUS    AGE
default-broker-filter-c6654bccf-6448v                   1/1     Running   8m40s
default-broker-ingress-74b49c78f4-mnskg                 1/1     Running   8m40s
eventingaloha-v1-deployment-9b46d459b-f8pfr             2/2     Running   30s
eventingbonjour-v1-deployment-fcd46b4dc-x6wvc           2/2     Running   18s

Wait approximately 60 seconds for eventingaloha and eventingbonjour to terminate and scale down to zero before proceeding.

Now create the Trigger for eventingaloha that will associate the filtered events to a service:

apiVersion: eventing.knative.dev/v1alpha1
kind: Trigger
metadata:
  name: helloaloha
spec:
  filter:
    attributes:
      type: greeting 1
  subscriber:
    ref:
     apiVersion: serving.knative.dev/v1alpha1
     kind: Service
     name: eventingaloha
1

The type is the CloudEvent type that is mapped to the ce-type HTTP header. A Trigger can filter by CloudEvent attributes such as type, source, or extension.

Now create the Trigger for eventingbonjour that will associate the filtered events to a service:

apiVersion: eventing.knative.dev/v1alpha1
kind: Trigger
metadata:
  name: hellobonjour
spec:
  filter:
    attributes:
      type: greeting
  subscriber:
    ref:
     apiVersion: serving.knative.dev/v1alpha1
     kind: Service
     name: eventingbonjour

Verify that your Triggers are ready:

$ kubectl -n chapter-4 create -f trigger-helloaloha.yaml
trigger.eventing.knative.dev/helloaloha created
$ kubectl -n chapter-4 create -f trigger-hellobonjour.yaml
trigger.eventing.knative.dev/hellobonjour created
$ kubectl get triggers
NAME         READY BROKER  SUBSCRIBER_URI                                     AGE
helloaloha   True  default http://eventingaloha.chapter-4.svc.cluster.local   24s
hellobonjour True  default http://eventingbonjour.chapter-4.svc.cluster.local 48s
Important

The preceding output has been modified for formatting purposes.

Discussion

Pull out the subscriberURI for eventhingaloha:

$ kubectl get trigger helloaloha -o jsonpath={.status.subscriberURI}
http://eventingaloha.chapter-4.svc.cluster.local

Pull out the subscriberURI for eventhingbonjour:

$ kubectl get trigger hellobonjour -o jsonpath={.status.subscriberURI}
http://eventingbonjour.chapter-4.svc.cluster.local

As well as the Broker’s subscriberURI:

$ kubectl get broker default -o jsonpath={.status.address.url}
http://default-broker.chapter-4.svc.cluster.local

You should notice that the subscriberURIs are Kubernetes services with the suffix of chapter-4.svc.cluster.local. This means they can be interacted with from another pod within the Kubernetes cluster.

Now that you have set up the Brokers and Triggers, you need to send in some test messages to see the behavior:

First, start streaming the logs for the event consumers:

$ stern eventing -c user-container

Then create a pod for using the curl command:

apiVersion: v1
kind: Pod
metadata:
  labels:
    run: curler
  name: curler
spec:
  containers:
  - name: curler
    image: fedora:29 1
    tty: true
1

You can use any image that includes a curl command.

Then exec into the curler pod:

$ kubectl -n chapter-4 apply -f curler.yaml
pod/curler created
$ kubectl -n chapter-4 exec -it curler -- /bin/bash

Using the curler pod’s shell, curl the subcriberURI for eventingaloha:

$ curl -v "http://eventingaloha.chapter-4.svc.cluster.local" \
-X POST \
-H "Ce-Id: say-hello" \
-H "Ce-Specversion: 1.0" \
-H "Ce-Type: aloha" \
-H "Ce-Source: mycurl" \
-H "Content-Type: application/json" \
-d {"key":"from a curl"}

You will then see eventingaloha will scale-up to respond to that event:

$ watch kubectl get pods
NAME                                        READY STATUS  AGE
curler                                      1/1   Running 59s
default-broker-filter-c6654bccf-vxm5m       1/1   Running 11m
default-broker-ingress-7479966dc7-pvtx6     1/1   Running 11m
eventingaloha-1-deployment-6cdc888d9d-9xnnn 2/2   Running 30s

Next, curl the subcriberURI for eventingbonjour:

$ curl -v "http://eventingbonjour.chapter-4.svc.cluster.local" \
-X POST \
-H "Ce-Id: say-hello" \
-H "Ce-Specversion: 1.0" \
-H "Ce-Type: bonjour" \
-H "Ce-Source: mycurl" \
-H "Content-Type: application/json" \
-d {"key":"from a curl"}

And you will see the eventingbonjour pod scale up:

$ watch kubectl get pods
NAME                                         READY STATUS  AGE
curler                                       1/1   Running 82s
default-broker-filter-c6654bccf-vxm5m        1/1   Running 11m
default-broker-ingress-7479966dc7-pvtx6      1/1   Running 11m
eventingaloha-1-deployment-6cdc888d9d-9xnnn  2/2   Running 53s
eventingbonjour-1-deployment-fc7858b5b-s9prj 2/2   Running 5s

Now, trigger both eventingaloha and eventingbonjour by curling the subcriberURI for the Broker:

$ curl -v "http://default-broker.chapter-4.svc.cluster.local" \
-X POST \
-H "Ce-Id: say-hello" \
-H "Ce-Specversion: 1.0" \
-H "Ce-Type: greeting" \
-H "Ce-Source: mycurl" \
-H "Content-Type: application/json" \
-d {"key":"from a curl"}
Note

“Ce-Type: greeting” is the key to insuring that both aloha and bonjour respond to this event.

And by watching the chapter-4 namespace, you will see both eventingaloha and eventingbonjour come to life:

$ watch kubectl get pods
NAME                                         READY STATUS  AGE
curler                                       1/1   Running 3m21s
default-broker-filter-c6654bccf-vxm5m        1/1   Running 13m
default-broker-ingress-7479966dc7-pvtx6      1/1   Running 13m
eventingaloha-1-deployment-6cdc888d9d-nlpm8  2/2   Running 6s
eventingbonjour-1-deployment-fc7858b5b-btdcr 2/2   Running 6s

You can experiment by using different types of filters in the Subscription to see how the different subscribed services respond. Filters may use a CloudEvent attribute for its criteria.

In this chapter, you have learned about Knative Eventing by understanding Event Sources, Event Sinks, and Event Channels; connecting Event Sources to Event Sinks; persisting messages in Event Channels; and filtering messages using Triggers.

In Chapter 6, you will be using filters to categorize events to Sink in and also learn how to enable Knative for observability, which will help you collect metrics and traces for real-world scenarios.

See Also

Knative Documentation on Brokers & Triggers

Get Knative Cookbook now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.