Chapter 4. Knative Eventing
In this chapter, we present recipes that will help you get started with Knative Eventing. We will start with a high-level overview of the usage patterns and then drill down into specific steps to connect the various components together into end-to-end working examples.
As previously described in Chapter 1, Knative has two major subprojects: Serving and Eventing. With Serving you have dynamic autoscaling, including scaling down to zero pods, based on the absence of HTTP traffic load. With Eventing, you now have that same autoscaling capability but bridged into other protocols or from other sources beyond HTTP. For example, a barrage of messages flowing through an Apache Kafka topic can cause autoscaling of your Kubernetes-based service to handle those messages. Or perhaps a scheduled event via cron can cause your service to awake from its slumber and perform its duties.
Usage Patterns
There are three primary usage patterns with Knative Eventing:
- Source to Sink
-
Source to Sink provides the simplest getting started experience with Knative Eventing. It provides single Sink—that is, event receiving service—with no queuing, backpressure, and filtering. Source to Sink does not support replies, which means the response from the Sink Service is ignored. As shown in Figure 4-1, the responsibility of the Event Source is just to deliver the message without waiting for the response from the Sink; hence, it will be appropriate to compare Source to Sink to the fire and forget messaging pattern.
- Channels and Subscriptions
-
With Channels and Subscriptions, the Knative Eventing system defines a Channel, which can connect to various backends such as In-Memory, Kafka, and GCP PubSub for sourcing the events. Each Channel can have one or more Subscribers in the form of Sink Services as shown in Figure 4-2, which can receive the event messages and process them as needed. Each message from the Channel is formatted as a CloudEvent and sent further up in the chain to other Subscribers for further processing. The Channels and Subscriptions usage pattern does not have the ability to filter messages.
- Brokers and Triggers
-
Brokers and Triggers are similar to Channels and Subscriptions, except that they support filtering of events. Event filtering is a method that allows the Subscribers to show an interest in a certain set of messages that flows into the Broker. For each Broker, Knative Eventing will implicitly create a Knative Eventing Channel. As shown in Figure 4-3, the Trigger gets itself subscribed to the Broker and applies the filter on the messages on its subscribed Broker. The filters are applied on the CloudEvent attributes of the messages, before delivering the message to the interested Sink Services (Subscribers).
Before You Begin
All the recipes in this chapter will be executed from the directory $BOOK_HOME
/eventing, so change to the recipe directory by running:
$
cd
$BOOK_HOME
/eventing
The recipes in this chapter will be deployed in the chapter-4
namespace, so switch to the chapter-4
namespace with the following command:
$
kubectl
config
set
-context
--current
--namespace
=
chapter-4
The recipes in this chapter will enable us to do eventing with Knative and will help us in understanding how Knative Serving Services can respond to external events via Knative Eventing.
4.1 Producing Events with Eventing Sources
Solution
Knative Eventing Sources are software components that emit events. The job of a Source is to connect to, drain, capture, and potentially buffer events, often from an external system, and then relay those events to the Sink.
Knative Eventing Sources install the following four sources out-of-the-box:
$ kubectl api-resources --api-group=sources.eventing.knative.dev NAME APIGROUP NAMESPACED KIND apiserversources sources.eventing.knative.dev true ApiServerSource containersources sources.eventing.knative.dev true ContainerSource cronjobsources sources.eventing.knative.dev true CronJobSource sinkbindings sources.eventing.knative.dev true SinkBinding
Discussion
The ApiServerSource
allows you to listen in on Kubernetes API events, like those events provided by kubectl get events
.
The ContainerSource
allows you to create your own container that emits events which can be targeted at Sinks—your specific Service.
The CronJobSource
allows you to specify a cron timer, a recurring task that will emit an event to your Sink on a periodic basis. The CronJobSource
is often the easiest way to verify that Knative Eventing is working properly.
SinkBindings
allows you to link any addressable Kubernetes resource to receive events from any other Kubernetes resource that may produce events.
There are many other Source types, and you can review the current list of Sources within the Knative documentation.
Before we take deep dive into the recipes in this chapter, let’s quickly understand the structure of a Knative Event Source resource YAML:
apiVersion
:
sources.eventing.knative.dev/v1alpha1
kind
:
CronJobSource
metadata
:
name
:
eventinghello-cronjob-source
spec
:
schedule
:
"
*/2
*
*
*
*
"
data
:
'
{"key":
"every
2
mins"}
'
sink
:
ref
:
apiVersion
:
serving.knative.dev/v1alpha1
kind
:
Service
name
:
eventinghello
4.2 Receiving Events with Knative Eventing Sinks
Solution
Knative Eventing Sink is how you specify the event receiver—that is, the consumer of the event. Sinks can be invoked directly in a point-to-point fashion by referencing them via the Event Source’s sink
as shown here:
apiVersion
:
sources.eventing.knative.dev/v1alpha1
kind
:
CronJobSource
metadata
:
name
:
eventinghello-cronjob-source
spec
:
schedule
:
"
*/2
*
*
*
*
"
data
:
'
{"key":
"every
2
mins"}
'
sink
:
ref
:
apiVersion
:
serving.knative.dev/v1alpha1
kind
:
Service
name
:
eventinghello
Discussion
The Sinks can target one of your Services—your code that will receive an HTTP POST with a CloudEvent payload. However, the Sink is also very flexible; it might point to a Channel (see Recipe 4.9 or a Broker (see Recipe 4.10), allowing for a publish-subscribe messaging pattern with one or more potential receivers. The Sink is often your Knative or Kubernetes Service that wishes to react to a particular event.
4.3 Deploying a Knative Eventing Service
Solution
Your code will handle an HTTP POST as shown in the following listing, where the CloudEvent data is available as HTTP headers as well as in the body of the request:
@PostMapping
(
"/"
)
public
ResponseEntity
<
String
>
myPost
(
HttpEntity
<
String
>
http
)
{
System
.
out
.
println
(
"ce-id="
+
http
.
getHeaders
().
get
(
"ce-id"
));
System
.
out
.
println
(
"ce-source="
+
http
.
getHeaders
().
get
(
"ce-source"
));
System
.
out
.
println
(
"ce-specversion="
+
http
.
getHeaders
()
.
get
(
"ce-specversion"
));
System
.
out
.
println
(
"ce-time="
+
http
.
getHeaders
().
get
(
"ce-time"
));
System
.
out
.
println
(
"ce-type="
+
http
.
getHeaders
().
get
(
"ce-type"
));
System
.
out
.
println
(
"content-type="
+
http
.
getHeaders
().
getContentType
());
System
.
out
.
println
(
"content-length="
+
http
.
getHeaders
().
getContentLength
());
System
.
out
.
println
(
"POST:"
+
http
.
getBody
());
}
The CloudEvent SDK provides a class library and framework integration for various language runtimes such as Go, Java, and Python.
Additional details on the CloudEvent to HTTP mapping can be found in the CloudEvent GitHub repository.
The following listing shows a simple Knative Service (Sink):
apiVersion
:
serving.knative.dev/v1alpha1
kind
:
Service
metadata
:
name
:
eventinghello
spec
:
template
:
metadata
:
name
:
eventinghello-v1
annotations
:
autoscaling.knative.dev/target
:
"
1
"
spec
:
containers
:
-
image
:
quay.io/rhdevelopers/eventinghello:0.0.1
Discussion
You can deploy and verify that the eventinghello
Sink Service has been deployed successfully by looking for READY
marked as True
:
$ kubectl -n chapter-4 apply -f eventing-hello-sink.yaml service.serving.knative.dev/eventinghello created $ kubectl get ksvc NAME URL READY eventinghello http://eventinghello.myeventing.example.com True
The default behavior of Knative Serving is that the very first deployment of a Knative Serving Service will automatically scale up to one pod, and after about 90 seconds it will autoscale down to zero pods.
You can actively watch the pod lifecycle with the following command:
$
watch
kubectl
get
pods
You can monitor the logs of the eventinghello
pod with:
$
stern
eventinghello
-c
user-container
Wait until eventinghello
scales to zero pods before moving on.
4.4 Connecting a Source to the Service
Solution
Deploy a CronJobSource
, as it is the easiest solution to verify if Knative Eventing is responding to events correctly. To deploy a CronJobSource
, run the following command:
$
kubectl
-n
chapter-4
apply
-f
eventinghello-source.yaml
cronjobsource.sources.eventing.knative.dev/eventinghello-cronjob-source
created
$
kubectl
-n
chapter-4
get
cronjobsource
NAME
READY
AGE
eventinghello-cronjob-source
True
10s
Discussion
The deployment of a CronJobSource
also produces a pod with a prefix of “cronjobsource-eventinghell” as shown:
$
watch
kubectl
get
pods
NAME
READY
STATUS
AGE
cronjobsource-eventinghello-54b9ef12-2c2f-11ea
1/1
Running
14s
Based on our cron expression, after two minutes it will kick off an event that will cause the eventinghello
pod to scale up as shown the following listing:
$
watch
kubectl
get
pods
NAME
READY
STATUS
AGE
cronjobsource-eventinghell-54b9ef12-2c2f-11ea
1/1
Running
97s
eventinghello-v1-deployment-7cfcb664ff-r694p
2/2
Running
10s
After approximately 60 seconds, the eventinghello
will autoscale down to zero pods, as it is a Knative Serving Service that will only be available while it is actively receiving events:
$
watch
kubectl
get
pods
NAME
READY
STATUS
AGE
cronjobsource-eventinghell-54b9ef12-2c2f-11ea
1/1
Running
2m28s
eventinghello-v1-deployment-7cfcb664ff-r694p
2/2
Terminating
65s
You can follow logs to see the CloudEvent details by using stern
:
$
stern
eventinghello
-c
user-container
ce-id
=
a1e0cbea-8f66-4fa6-8f3c-e5590c4ee147
ce-source
=
/apis/v1/namespaces/chapter-5/cronjobsources/
eventinghello-cronjob-source
ce-specversion
=
1.0
ce-time
=
2020-01-01T00:44:00.000889221Z
ce-type
=
dev.knative.cronjob.event
content-type
=
application/json
content-length
=
22
POST:
{
"key"
:
"every 2 mins"
}
Finally, when you are done with experimentation, simply delete the source and service:
$
kubectl
-n
chapter-4
delete
-f
eventinghello-source.yaml
cronjobsource.sources.eventing.knative.dev
"eventinghello-cronjob-source"
deleted
$
kubectl
-n
chapter-4
delete
-f
eventing-hello-sink.yaml
service.serving.knative.dev
"eventinghello"
deleted
$
kubectl
get
pods
-n
chapter-4
No
resources
found.
4.5 Deploying an Apache Kafka Cluster
Solution
One of the easiest ways to deploy an Apache Kafka cluster is to use Strimzi, an operator and set of CRDs that deploys Apache Kafka inside of a Kubernetes cluster.
Discussion
As part of the upcoming recipes in this chapter, we will be deploying a Knative Source (see Recipe 4.6) that will respond to Apache Kafka Topic messages (events). Before getting to those recipes, we need to first deploy Apache Kafka inside your Kubernetes cluster. The strimzi Kubernetes operator can be used to deploy the Apache Kafka cluster in your Kubernetes cluster.
Run the following command to create the kafka
namespace and deploy Apache Kafka into it:
$
kubectl
create
namespace
kafka
$
curl
-L
\
https://github.com/strimzi/strimzi-kafka-operator
\
/releases/download/0.16.2/strimzi-cluster-operator-0.16.2.yaml
\
|
sed
's/namespace: .*/namespace: kafka/'
\
|
kubectl
apply
-f
-
-n
kafka
Wait for the strimzi-cluster-operator
to be running:
$
watch
kubectl
get
pods
-n
kafka
NAME
READY
STATUS
AGE
strimzi-cluster-operator-85f596bfc7-7dgds
1/1
Running
1m2s
The strimzi operator would have installed several Apache Kafka–related CRDs, which can be used to create Apache Kafka core resources such as a topic, users, connectors, etc. You can verify the CRDs that are available by querying api-resources
:
$ kubectl api-resources --api-group=kafka.strimzi.io kafkabridges.kafka.strimzi.io 2019-12-28T14:53:14Z kafkaconnects.kafka.strimzi.io 2019-12-28T14:53:14Z kafkaconnects2is.kafka.strimzi.io 2019-12-28T14:53:14Z kafkamirrormakers.kafka.strimzi.io 2019-12-28T14:53:14Z kafkas.kafka.strimzi.io 2019-12-28T14:53:14Z kafkatopics.kafka.strimzi.io 2019-12-28T14:53:14Z kafkausers.kafka.strimzi.io 2019-12-28T14:53:14Z
Now with the Apache Kafka operator running, you can deploy and verify a single-node Apache Kakfa cluster by running the command:
$
kubectl
-n
kafka
apply
-f
kafka-broker-my-cluster.yaml
kafka.kafka.strimzi.io/my-cluster
created
$
watch
kubectl
get
pods
-n
kafka
NAME
READY
STATUS
AGE
my-cluster-entity-operator-7d677bdf7b-jpws7
3/3
Running
85s
my-cluster-kafka-0
2/2
Running
110s
my-cluster-zookeeper-0
2/2
Running
2m22s
strimzi-cluster-operator-85f596bfc7-7dgds
1/1
Running
4m22s
The Kubernetes CRD resource $BOOK_HOME
/eventing/kafka-broker-my-cluster.yaml will deploy a single Zookeeper, Kafka Broker, and an Entity-Operator. The Entity-Operator is responsible for managing different custom resources such as KafkaTopic and KafkaUser.
Now that you have an Apache Kafka cluster deployed, you can create a Kafka Topic using the KafkaTopic CRD. The following listing shows how to create a Kafka Topic named my-topic
:
apiVersion
:
kafka.strimzi.io/v1alpha1
kind
:
KafkaTopic
metadata
:
name
:
my-topic
labels
:
strimzi.io/cluster
:
my-cluster
spec
:
partitions
:
10
replicas
:
1
partitions:
n
allows for more concurrent scale-out of Sink pods. In theory, up to 10 pods will scale-up if there are enough messages flowing through the Kafka Topic.
Note
You can choose to skip the manual pre-creation of a KafkaTopic but the automatically generated topics will have partitions set to 1 by default.
Create and verify the topic:
$
kubectl
-n
kafka
create
-f
kafka-topic-my-topic.yaml
kafkatopic.kafka.strimzi.io/my-topic
created
$
kubectl
-n
kafka
get
kafkatopics
NAME
PARTITIONS
REPLICATION
FACTOR
my-topic
10
1
Verify that your Kafka Topic is working correctly by connecting a simple producer and consumer and creating some test messages. The sample code repository includes a script for producing Kafka messages called kafka-producer.sh. Execute the script and type in one
, two
, three
, hitting Enter/Return after each string:
Producer
$
$BOOK_HOME
/bin/kafka-producer.sh
>one
>two
>three
4.6 Sourcing Apache Kafka Events with Knative Eventing
Solution
Use the Knative Eventing KafkaSource
to have the Kafka messages flow through the Knative Eventing Channels. You can deploy the Knative KafkaSource
by running the command:
$
kubectl
apply
\
-f
https://github.com/knative/eventing-contrib/
\
releases/download/v0.12.2/kafka-source.yaml
The previous step deploys the Knative KafkaSource
in the knative-sources
namespace as well as a CRD, ServiceAccount, ClusterRole, etc. Verify that the KnativeSource
namespace includes the kafka-controller-manager-0
pod:
$
watch
kubectl
get
pods
-n
knative-sources
NAME
READY
STATUS
AGE
kafka-controller-manager-0
1/1
Running
1m17s
You should also deploy the Knative Kafka Channel that can be used to connect the Knative Eventing Channel with an Apache Kafka cluster backend. To deploy a Knative Kafka Channel, run:
$
curl
-L
"https://github.com/knative/eventing-contrib/\ releases/download/v0.12.2/kafka-channel.yaml"
\
|
sed
's/REPLACE_WITH_CLUSTER_URL/my-cluster-kafka-bootstrap.kafka:9092/'
\
|
kubectl
apply
--filename
-
Note
“my-cluster-kafka-bootstrap.kafka:9092” comes from kubectl get services -n kafka
.
Discussion
Look for three new pods in the knative-eventing
namespace with the prefix “kafka”:
$
watch
kubectl
get
pods
-n
knative-eventing
NAME
READY
STATUS
AGE
eventing-controller-666b79d867-kq8cc
1/1
Running
64m
eventing-webhook-5867c98d9b-hzctw
1/1
Running
64m
imc-controller-7c4f9945d7-s59xd
1/1
Running
64m
imc-dispatcher-7b55b86649-nsjm2
1/1
Running
64m
kafka-ch-controller-7c596b6b55-fzxcx
1/1
Running
33s
kafka-ch-dispatcher-577958f994-4f2qs
1/1
Running
33s
kafka-webhook-74bbd99f5c-c84ls
1/1
Running
33s
sources-controller-694f8df9c4-pss2w
1/1
Running
64m
And you should also find some new api-resources
as shown here:
$ kubectl api-resources --api-group=sources.eventing.knative.dev NAME APIGROUP NAMESPACED KIND apiserversources sources.eventing.knative.dev true ApiServerSource containersources sources.eventing.knative.dev true ContainerSource cronjobsources sources.eventing.knative.dev true CronJobSource kafkasources sources.eventing.knative.dev true KafkaSource sinkbindings sources.eventing.knative.dev true SinkBinding $kubectl api-resources --api-group=messaging.knative.dev NAME SHORTNAMES APIGROUP NAMESPACED KIND channels ch messaging.knative.dev true Channel inmemorychannels imc messaging.knative.dev true InMemoryChannel kafkachannels kc messaging.knative.dev true KafkaChannel parallels messaging.knative.dev true Parallel sequences messaging.knative.dev true Sequence subscriptions sub messaging.knative.dev true Subscription
Now that all of your infrastructure is configured, you can deploy the Knative Serving Service (Sink) by running the command:
$ kubectl apply -n chapter-5 -f eventing-hello-sink.yaml service.serving.knative.dev/eventinghello created $ kubectl get ksvc NAME URL READY eventinghello http://eventinghello.kafka.example.com True
Make sure to follow the logs using stern
:
$
stern
eventinghello
-c
user-container
The initial deployment of eventinghello
will cause it to scale up to one pod. It will be around until it hits its scale-down time limit. Allow it to scale down to zero pods before continuing.
Create a KafkaSource
for my-topic
by connecting your Kafka Topic my-topic
to eventinghello
:
apiVersion
:
sources.eventing.knative.dev/v1alpha1
kind
:
KafkaSource
metadata
:
name
:
mykafka-source
spec
:
consumerGroup
:
knative-group
bootstrapServers
:
my-cluster-kafka-bootstrap.kafka:9092
topics
:
my-topic
sink
:
ref
:
apiVersion
:
serving.knative.dev/v1alpha1
kind
:
Service
name
:
eventinghello
“my-cluster-kafka-bootstrap.kafka:9092” can be found via
kubectl get -n kafka services
This is another example of a direct Source to Service
The deployment of KafkaSource
will result in a new pod prefixed with “mykafka-source”:
$
kubectl
-n
chapter-4
apply
-f
mykafka-source.yaml
kafkasource.sources.eventing.knative.dev/mykafka-source
created
$
watch
kubectl
get
pods
NAME
READY
STATUS
RESTARTS
AGE
mykafka-source-vxs2k-56548756cc-j7m7v
1/1
Running
0
11s
Important
Since we had test messages of “one,” “two,” and “three” from earlier, you might see the eventinghello
service awaken to process those messages.
Wait for eventinghello
to scale down to zero pods before moving on, and then push more Kafka messages into my-topic
.
Let’s now start an Apache Kafka producer that will send a message to my-topic
:
$
$BOOK_HOME
/bin/kafka-producer.sh
And then enter the following JSON-formatted messages:
{
"hello"
:
"world"
}
{
"hola"
:
"mundo"
}
{
"bonjour"
:
"le monde"
}
{
"hey"
:
"duniya"
}
Note
Knative Eventing events through the KafkaSource
must be JSON-formatted.
While making sure to monitor the logs of the eventinghello
pod:
$
stern
eventinghello
-c
user-container
ce-id=
partition:1/offset:1 ce-source=
/apis/v1/namespaces/kafka/kafkasources/mykafka-source#my-topic ce-specversion=
1.0 ce-time=
2020-01-01T01:16:12.886Z ce-type=
dev.knative.kafka.event content-type=
application/json content-length=
17 POST:{
"hey"
:"duniya"
}
4.7 Autoscaling with Apache Kafka and Knative Eventing
Solution
You simply need to set the autoscaling target low enough by adding the annotation autoscaling.knative.dev/target: "1"
, while simultaneously pushing enough messages through the topic. You have already set the target to be 1
when deploying the eventinghello
sink as shown in the following listing:
apiVersion
:
serving.knative.dev/v1alpha1
kind
:
Service
metadata
:
name
:
eventinghello
spec
:
template
:
metadata
:
name
:
eventinghello-v1
annotations
:
autoscaling.knative.dev/target
:
"
1
"
spec
:
containers
:
-
image
:
quay.io/rhdevelopers/eventinghello:0.0.1
Discussion
With a concurrency factor of 1
, if you are able to push in a number of Kafka message rapidly, you will see more than one eventinghello
pod scaled up to handle the load.
You simply need an application that allows you to push in messages rapidly. Launch the Kafka Spammer application and push in at least three messages, then run the following command to start the kafka-spammer
pod:
$
kubectl
-n
kafka
run
kafka-spammer
\
--image
=
quay.io/rhdevelopers/kafkaspammer:1.0.2
You then exec
into the running kafka-spammer
pod by running the following command:
$ KAFKA_SPAMMER_POD=$(kubectl -n kafka get pod -l "run=kafka-spammer" \ -o jsonpath={.items[0].metadata.name}) $ kubectl -n kafka exec -it $KAFKA_SPAMMER_POD -- /bin/sh
Use curl to send in three messages:
$
curl
localhost:8080/3
You should see about three eventinghello
pods coming to life, as shown in the following listing:
$
watch
kubectl
get
pods
NAME
READY
STATUS
AGE
eventinghello-v1-deployment-65c9b9c7df-8rwqc
1/2
Running
6s
eventinghello-v1-deployment-65c9b9c7df-q7pcf
1/2
Running
4s
eventinghello-v1-deployment-65c9b9c7df-zht2t
1/2
Running
6s
kafka-spammer-77ccd4f9c6-sx5j4
1/1
Running
26s
my-cluster-entity-operator-7d677bdf7b-jpws7
3/3
Running
27m
my-cluster-kafka-0
2/2
Running
27m
my-cluster-zookeeper-0
2/2
Running
28m
mykafka-source-vxs2k-56548756cc-j7m7v
1/1
Running
5m12s
strimzi-cluster-operator-85f596bfc7-7dgds
1/1
Running
30m
Note
The events are not being evenly distributed across the various eventinghello
pods; the first pod up starts consuming them all immediately.
To close out the spammer, use exit
and then delete its deployment:
$
kubectl
delete
-n
kafka
deployment
kafka-spammer
4.8 Using a Kafka Channel as the Default Knative Channel
Solution
Persistence and Durability are two very important features of any messaging-based architecture. The Knative Channel has built-in support for durability. Durability of messages becomes ineffective if the Knative Eventing Channel does not support persistence. Without persistence, it will not be able to deliver the messages to Subscribers that might be offline at the time of message delivery.
By default all Knative Channels created by the Knative Eventing API use InMemoryChannel (IMC), which does not have the capability to persist messages. To enable persistence we need to use one of the supported Channels such as GCP PubSub, Kafka, or Neural Autonomic Transport System (NATS) as the default Knative Channel backend.
We installed Apache Kafka earlier in Recipe 4.6. Let’s now configure it to be the default Knative Channel backend:
apiVersion
:
v1
kind
:
ConfigMap
metadata
:
name
:
default-ch-webhook
namespace
:
knative-eventing
data
:
default-ch-config
:
|
clusterDefault:
apiVersion: messaging.knative.dev/v1alpha1
kind: InMemoryChannel
namespaceDefaults:
chapter-4:
apiVersion: messaging.knative.dev/v1alpha1
kind: KafkaChannel
spec:
numPartitions: 1
replicationFactor: 1
For the cluster we will still use the default InMemoryChannel
For the namespace
chapter-4
, all Knative Eventing Channels will useKafka
Channel
as the default
Run the following command to apply the Knative Eventing Channel configuration:
$
kubectl
apply
-f
default-kafka-channel.yaml
Discussion
Since you have now made all Knative Eventing Channels of chapter-4
to be KafkaChannel, creating a Knative Eventing Channel in the chapter-4
namespace will result in a corresponding Kafka Topic being created. Let’s now verify it by creating a sample Channel as shown in the following listing:
cat <<EOF | kubectl apply -f -
apiVersion
:
messaging.knative.dev/v1alpha1
kind
:
Channel
metadata
:
name
:
my-events-ch
namespace
:
chapter-4
spec
:
{}
EOF
When you now list the topics that are available in Kafka using the script $BOOK_HOME
/bin/kafka-list-topics.sh, you should see a topic corresponding to your Channel my-events-ch
:
$
$BOOK_HOME
/bin/kafka-list-topics.sh
knative-messaging-kafka.chapter-4.my-events-ch
For each Knative Eventing Channel that you create, a Kafka Topic will be created. The topic’s name will follow a convention like knative-messaging-kafka.<your-channel-namespace>.<your-channel-name>
.
4.9 Using Knative Channels and Subscriptions
Solution
Use Knative Eventing Channels and Subscriptions:
- Channels
-
Channels are an event-forwarding and persistence layer where each Channel is a separate Kubernetes Custom Resource. A Channel may be backed by Apache Kafka or InMemoryChannel.
- Subscriptions
-
Subscriptions are how you register your service to listen to a particular channel.
The use of Channels and Subscriptions allows you to decouple the producers and consumers of events.
The recipe is as follows:
-
Create a Channel
-
Create a Source to Sink to the Channel
-
Create at least two Sink Services
-
Create Subscriptions to register your Sink Services with the Channel
Create a Channel:
apiVersion
:
messaging.knative.dev/v1alpha1
kind
:
Channel
metadata
:
name
:
eventinghello-ch
Verify that your Channel was created successfully:
$ kubectl -n chapter-4 create -f eventinghello-channel.yaml channel.messaging.knative.dev/eventinghello-ch created $ kubectl get ch NAME READY eventinghello-ch True URL http://eventinghello-ch-kn-channel.chapter-5.svc.cluster.local
Then you need to create a Source that will send events to the Channel:
apiVersion
:
sources.eventing.knative.dev/v1alpha1
kind
:
CronJobSource
metadata
:
name
:
my-cjs
spec
:
schedule
:
"
*/2
*
*
*
*
"
data
:
'
{"message":
"From
CronJob
Source"}
'
sink
:
ref
:
apiVersion
:
messaging.knative.dev/v1alpha1
kind
:
Channel
name
:
eventinghello-ch
The Channel API is in the api-group
messaging.eventing.knative.dev
kind
is aChannel
instead of direct to a specific service; the default is an InMemoryChannel implementation
Deploy and verify that your CronJobSource
is running:
$
kubectl
-n
chapter-4
create
-f
eventinghello-source-ch.yaml
cronjobsource.sources.eventing.knative.dev/my-cjs
created
$
kubectl
-n
chapter-4
get
cronjobsource
NAME
READY
AGE
my-cjs
True
8s
Now you create the Sink services that will become the Subscribers:
apiVersion
:
serving.knative.dev/v1alpha1
kind
:
Service
metadata
:
name
:
eventinghelloa
spec
:
template
:
metadata
:
name
:
eventinghelloa-v1
annotations
:
autoscaling.knative.dev/target
:
"
1
"
spec
:
containers
:
-
image
:
quay.io/rhdevelopers/eventinghello:0.0.1
apiVersion
:
serving.knative.dev/v1alpha1
kind
:
Service
metadata
:
name
:
eventinghellob
spec
:
template
:
metadata
:
name
:
eventinghellob-v1
annotations
:
autoscaling.knative.dev/target
:
"
1
"
spec
:
containers
:
-
image
:
quay.io/rhdevelopers/eventinghello:0.0.1
$
kubectl
-n
chapter-4
create
-f
eventing-helloa-sink.yaml
service.serving.knative.dev/eventinghelloa
created
$
kubectl
-n
chapter-4
create
-f
eventing-hellob-sink.yaml
service.serving.knative.dev/eventinghellob
created
Now create the appropriate Subscription for eventinghelloa
to the Channel eventinghello-ch
:
apiVersion
:
messaging.knative.dev/v1alpha1
kind
:
Subscription
metadata
:
name
:
eventinghelloa-sub
spec
:
channel
:
apiVersion
:
messaging.knative.dev/v1alpha1
kind
:
Channel
name
:
eventinghello-ch
subscriber
:
ref
:
apiVersion
:
serving.knative.dev/v1alpha1
kind
:
Service
name
:
eventinghelloa
And create the appropriate Subscription for eventinghellob
to the Channel eventinghello-ch
:
apiVersion
:
messaging.knative.dev/v1alpha1
kind
:
Subscription
metadata
:
name
:
eventinghellob-sub
spec
:
channel
:
apiVersion
:
messaging.knative.dev/v1alpha1
kind
:
Channel
name
:
eventinghello-ch
subscriber
:
ref
:
apiVersion
:
serving.knative.dev/v1alpha1
kind
:
Service
name
:
eventinghellob
$
kubectl
-n
chapter-4
create
-f
eventing-helloa-sub.yaml
subscription.messaging.knative.dev/eventinghelloa-sub
created
$
kubectl
-n
chapter-4
create
-f
eventing-hellob-sub.yaml
subscription.messaging.knative.dev/eventinghellob-sub
created
Discussion
If you wait approximately two minutes for the CronJobSource
, you will see both eventinghelloa
and eventinghellob
begin to run:
$
watch
kubectl
get
pods
NAME
READY
STATUS
AGE
cronjobsource-my-cjs-93544f14-2bf9-11ea-83c7-08002737670c
1/1
Running
2m15s
eventinghelloa-1-deployment-d86bf4847-hvbk6
2/2
Running
5s
eventinghellob-1-deployment-5c986c7586-4clpb
2/2
Running
5s
Once you are done with your experimentation, you can delete the event source my-cjs
and eventinghelloa
and eventinghellob
:
$
kubectl
-n
chapter-4
delete
-f
eventing-helloa-sink.yaml
$
kubectl
-n
chapter-4
delete
-f
eventing-helloa-sub.yaml
$
kubectl
-n
chapter-4
delete
-f
eventing-hellob-sink.yaml
$
kubectl
-n
chapter-4
delete
-f
eventing-hellob-sub.yaml
$
kubectl
-n
chapter-4
delete
-f
eventinghello-source-ch.yaml
4.10 Using Knative Eventing Brokers and Triggers
Solution
Use the Knative Eventing Broker
and Trigger
Custom Resources to allow for CloudEvent attribute filtering.
The recipe is as follows:
-
Inject the default Broker
-
Create at least two Sink Services
-
Create Triggers to register your Sink Services with the Channel
-
Push some events
Labeling the chapter-4
namespace with knative-eventing-injection=enabled
as shown in the following code will make Knative Eventing deploy a default Knative Eventing Broker and its related ingress:
$
kubectl
label
namespace
chapter-4
knative-eventing-injection
=
enabled
Verify that the default Broker is running:
$ kubectl --namespace chapter-4 get broker NAME READY REASON URL AGE default True http://default-broker.chapter-4.svc.cluster.local 22s
This will also start two additional pods named default-broker-filter
and default-broker-ingress
:
$
watch
kubectl
get
pods
NAME
READY
STATUS
AGE
default-broker-filter-c6654bccf-qb272
1/1
Running
18s
default-broker-ingress-7479966dc7-99xvm
1/1
Running
18s
Now that you have the Broker configured, you need to create the Sinks eventingaloha
and eventingbonjour
, which will receive the filtered events.
Run the following command to deploy and verify the Knative Serving Services eventingaloha
and eventingbonjour
:
$ kubectl -n chapter-4 create -f eventing-aloha-sink.yaml service.serving.knative.dev/eventingaloha created $ kubectl -n chapter-4 create -f eventing-bonjour-sink.yaml service.serving.knative.dev/eventingbonjour created $ kubectl get ksvc NAME URL READY eventingaloha http://eventingaloha.myeventing.example.com True eventingbonjour http://eventingbonjour.myeventing.example.com True
Note
The image being used by both of these services is identical. However, the difference between the names aloha
and bonjour
will make obvious which one is receiving a particular event.
$
watch
kubectl
get
pods
NAME
READY
STATUS
AGE
default-broker-filter-c6654bccf-6448v
1/1
Running
8m40s
default-broker-ingress-74b49c78f4-mnskg
1/1
Running
8m40s
eventingaloha-v1-deployment-9b46d459b-f8pfr
2/2
Running
30s
eventingbonjour-v1-deployment-fcd46b4dc-x6wvc
2/2
Running
18s
Wait approximately 60 seconds for eventingaloha
and eventingbonjour
to terminate and scale down to zero before proceeding.
Now create the Trigger for eventingaloha
that will associate the filtered events to a service:
apiVersion
:
eventing.knative.dev/v1alpha1
kind
:
Trigger
metadata
:
name
:
helloaloha
spec
:
filter
:
attributes
:
type
:
greeting
subscriber
:
ref
:
apiVersion
:
serving.knative.dev/v1alpha1
kind
:
Service
name
:
eventingaloha
The
type
is the CloudEvent type that is mapped to thece-type
HTTP header. A Trigger can filter by CloudEvent attributes such as type, source, or extension.
Now create the Trigger for eventingbonjour
that will associate the filtered events to a service:
apiVersion
:
eventing.knative.dev/v1alpha1
kind
:
Trigger
metadata
:
name
:
hellobonjour
spec
:
filter
:
attributes
:
type
:
greeting
subscriber
:
ref
:
apiVersion
:
serving.knative.dev/v1alpha1
kind
:
Service
name
:
eventingbonjour
Verify that your Triggers are ready:
$ kubectl -n chapter-4 create -f trigger-helloaloha.yaml trigger.eventing.knative.dev/helloaloha created $ kubectl -n chapter-4 create -f trigger-hellobonjour.yaml trigger.eventing.knative.dev/hellobonjour created $ kubectl get triggers NAME READY BROKER SUBSCRIBER_URI AGE helloaloha True default http://eventingaloha.chapter-4.svc.cluster.local 24s hellobonjour True default http://eventingbonjour.chapter-4.svc.cluster.local 48s
Important
The preceding output has been modified for formatting purposes.
Discussion
Pull out the subscriberURI
for eventhingaloha
:
$ kubectl get trigger helloaloha -o jsonpath={.status.subscriberURI} http://eventingaloha.chapter-4.svc.cluster.local
Pull out the subscriberURI
for eventhingbonjour
:
$ kubectl get trigger hellobonjour -o jsonpath={.status.subscriberURI} http://eventingbonjour.chapter-4.svc.cluster.local
As well as the Broker’s subscriberURI
:
$ kubectl get broker default -o jsonpath={.status.address.url} http://default-broker.chapter-4.svc.cluster.local
You should notice that the subscriberURI
s are Kubernetes services with the suffix of chapter-4.svc.cluster.local
. This means they can be interacted with from another pod within the Kubernetes cluster.
Now that you have set up the Brokers and Triggers, you need to send in some test messages to see the behavior:
First, start streaming the logs for the event consumers:
$
stern
eventing
-c
user-container
Then create a pod for using the curl
command:
apiVersion
:
v1
kind
:
Pod
metadata
:
labels
:
run
:
curler
name
:
curler
spec
:
containers
:
-
name
:
curler
image
:
fedora:29
tty
:
true
Then exec
into the curler
pod:
$
kubectl
-n
chapter-4
apply
-f
curler.yaml
pod/curler
created
$
kubectl
-n
chapter-4
exec
-it
curler
--
/bin/bash
Using the curler
pod’s shell, curl
the subcriberURI
for eventingaloha
:
$ curl -v "http://eventingaloha.chapter-4.svc.cluster.local" \ -X POST \ -H "Ce-Id: say-hello" \ -H "Ce-Specversion: 1.0" \ -H "Ce-Type: aloha" \ -H "Ce-Source: mycurl" \ -H "Content-Type: application/json" \ -d {"key":"from a curl"}
You will then see eventingaloha
will scale-up to respond to that event:
$
watch
kubectl
get
pods
NAME
READY
STATUS
AGE
curler
1/1
Running
59s
default-broker-filter-c6654bccf-vxm5m
1/1
Running
11m
default-broker-ingress-7479966dc7-pvtx6
1/1
Running
11m
eventingaloha-1-deployment-6cdc888d9d-9xnnn
2/2
Running
30s
Next, curl
the subcriberURI
for eventingbonjour
:
$ curl -v "http://eventingbonjour.chapter-4.svc.cluster.local" \ -X POST \ -H "Ce-Id: say-hello" \ -H "Ce-Specversion: 1.0" \ -H "Ce-Type: bonjour" \ -H "Ce-Source: mycurl" \ -H "Content-Type: application/json" \ -d {"key":"from a curl"}
And you will see the eventingbonjour
pod scale up:
$
watch
kubectl
get
pods
NAME
READY
STATUS
AGE
curler
1/1
Running
82s
default-broker-filter-c6654bccf-vxm5m
1/1
Running
11m
default-broker-ingress-7479966dc7-pvtx6
1/1
Running
11m
eventingaloha-1-deployment-6cdc888d9d-9xnnn
2/2
Running
53s
eventingbonjour-1-deployment-fc7858b5b-s9prj
2/2
Running
5s
Now, trigger both eventingaloha
and eventingbonjour
by curling the subcriberURI
for the Broker:
$ curl -v "http://default-broker.chapter-4.svc.cluster.local" \ -X POST \ -H "Ce-Id: say-hello" \ -H "Ce-Specversion: 1.0" \ -H "Ce-Type: greeting" \ -H "Ce-Source: mycurl" \ -H "Content-Type: application/json" \ -d {"key":"from a curl"}
Note
“Ce-Type: greeting” is the key to insuring that both aloha
and bonjour
respond to this event.
And by watching the chapter-4
namespace, you will see both eventingaloha
and eventingbonjour
come to life:
$
watch
kubectl
get
pods
NAME
READY
STATUS
AGE
curler
1/1
Running
3m21s
default-broker-filter-c6654bccf-vxm5m
1/1
Running
13m
default-broker-ingress-7479966dc7-pvtx6
1/1
Running
13m
eventingaloha-1-deployment-6cdc888d9d-nlpm8
2/2
Running
6s
eventingbonjour-1-deployment-fc7858b5b-btdcr
2/2
Running
6s
You can experiment by using different types of filters in the Subscription to see how the different subscribed services respond. Filters
may use a CloudEvent attribute for its criteria.
In this chapter, you have learned about Knative Eventing by understanding Event Sources, Event Sinks, and Event Channels; connecting Event Sources to Event Sinks; persisting messages in Event Channels; and filtering messages using Triggers.
In Chapter 6, you will be using filters to categorize events to Sink in and also learn how to enable Knative for observability, which will help you collect metrics and traces for real-world scenarios.
See Also
Knative Documentation on Brokers & Triggers
Get Knative Cookbook now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.