第 12 章 使用连接器和 Worker 插件扩展 Kafka Connect 使用连接器和 Worker 插件扩展 Kafka Connect
本作品已使用人工智能进行翻译。欢迎您提供反馈和意见:translation-feedback@oreilly.com
Kafka Connect 有两种不同的插件,可以用来扩展其功能:
- 连接器插件
使用来影响特定管道的行为
- 工人插件
用于定制 Kafka Connect 运行时
Kafka Connect 附带了少量内置插件,Kafka 社区还构建了更多插件。在本章中,我们将介绍如何实现自己的连接器和 Worker 插件。在"实现连接器插件 "中,我们省略了连接器,因为我们将在第 11 章中介绍如何实现连接器。在考虑编写自己的插件之前,请查看是否已经有适合您的用例的插件。
无论实施哪种类型的插件,都有一些共同的注意事项。首先,连接器和 Worker 插件的构建和打包流程与连接器相同。我们将在"构建自定义连接器 "中介绍相关步骤。其次,在connect-api 和clients 包中包含了一些Exception 类,当您需要抛出Exception 时,可以在插件中使用这些类。其中两个类是DataException 和ConfigException 。您可以直接使用这些类,也可以对它们进行子类化。最后,与任何软件一样,测试你所实现的插件以确保它们能按预期运行是很重要的。
与连接器相比,连接器和 Worker 插件往往更容易测试,因为它们的生命周期要简单得多。在实例化之后,会调用configure() 方法,然后调用插件的主方法--apply() 用于转换,test() 用于谓词等--然后是close() 。插件的重要逻辑方法也较少,因此您可以将测试重点放在这些关键方法上,几乎完全依靠单元测试来验证其逻辑。
在许多情况下,只需使用单元测试,就能获得良好的覆盖率和对逻辑的信心。如果您计划在管道中组合多个插件,我们建议您进行一些集成测试,以验证它们是否能协同工作。在编写单元测试时,请务必注意运行时可能用来调用方法的不同参数组合。尤其要注意哪些参数可能是null 。例如,在Converter API 中,传给fromConnectData() 的 headers 对象始终不是null ,而值可以是null 。
让我们来看看实现不同连接器插件需要哪些条件。
实施连接器插件
除了连接器,还有四种用于扩展 Kafka Connect 的连接器插件:
TransformationPredicateConverterHeaderConverter
提示
默认情况下,GET /connector-plugins 端点只列出连接器。如果想查看当前安装在 Kafka Connect 运行时的所有连接器插件,从 Kafka 3.2 起,可以将connectorsOnly 标志设置为false :
curllocalhost:8083/connector-plugins?connectorsOnly=false
每种连接器插件类型都有自己的一个或多个类,这些类构成了您需要实现的 API;不过,这些类所包含的方法种类有一些重叠。为了完整起见,我们包含了所有方法;不过,对于如何实现config() 和configure() 方法,以及可以抛出哪些类型的Exception ,有一些通用的最佳实践。
要为连接器插件提供特定的配置设置,需要构建一个ConfigDef 对象。该对象与Connector API 中config() 方法返回的类型相同。在连接器插件的 ...
Become an O’Reilly member and get unlimited access to this title plus top books and audiobooks from O’Reilly and nearly 200 top publishers, thousands of courses curated by job role, 150+ live events each month,
and much more.
Read now
Unlock full access