Kafka Connect Transformations
Learn about transformations in Kafka Connect and how to use them.
Kafka Connect Transformations enable users to modify the structure, format, and content of data as it flows through the Kafka Connect framework. They act as intermediary steps in the data pipeline, allowing for data enrichment, filtering, mapping, and other data manipulation tasks. By applying transformations, Kafka Connect provides a powerful mechanism to shape and process data in real time to cater to the diverse requirements of different systems and applications.
Important use cases of transformations include the following:
Data normalization and enrichment: Incoming data from diverse sources may have different schemas, formats, or levels of data quality. Kafka Connect Transformations allow for data normalization, where disparate data can be transformed into a unified structure that conforms to the desired format or schema.
Filtering and routing: In many cases, not all data flowing through Kafka Connect is relevant for downstream systems or applications. Transformations allow filtering out unnecessary data based on predefined criteria, reducing the data volume and improving system efficiency. They also enable routing data to different Kafka topics or sinks based on specific conditions, allowing for targeted data delivery to different components within the architecture.
How to use transformations
A transformation chain in Kafka Connect allows us to apply multiple transformations sequentially to the data as it flows through the connector. Each transformation operates on the output of the previous transformation in the chain, allowing us to perform a series of data manipulation steps.
This chain can be specified in the connector configuration using the following attributes:
transforms
: This attribute is used to specify the list of aliases for the transformations and defines the order in which the transformations will be applied. Each transformation in the chain is assigned a unique alias, and the aliases are listed in thetransforms
attribute in the desired order of execution.transforms.$alias.type
: For each alias specified in thetransforms
attribute, this attribute is used to provide the fully qualified class name of the transformation. It defines the type of transformation that is applied to the data. Kafka Connect provides a set of built-in transformations, such asorg.apache.kafka.connect.transforms.InsertField
,org.apache.kafka.connect.transforms.ReplaceField
, etc., which can be specified here.transforms.$alias.$<specific configuration>
: This attribute allows us to configure the specific properties required by each transformation in the chain. The$<specific configuration>
placeholder should be replaced with the configuration property name relevant to the specific transformation. For example, if we use theInsertField
transformation, we can set properties liketransforms.$alias.static.field
,transforms.$alias.static.value
, etc., to define the static data to be inserted.
Use case for Kafka Connect Transformations
Let’s consider a scenario where a Kafka Connect Source connector reads data from a source system and needs to apply transformations before writing the data to a Kafka topic. In that case, we want to perform the following transformations in order:
Replace a field named
oldField
with a new field namednewField
.Insert a static field named
additionalField
with a value ofextra
.
To achieve this, we will configure the transformation chain in the connector configuration as follows:
Get hands-on with 1400+ tech skills courses.