Kafka
Creating a data generator for Kafka. You will build a Docker image that will be able to populate data in kafka for the topics you configure.
Requirements
- 10 minutes
- Git
- Gradle
- Docker
Get Started
First, we will clone the data-caterer-example repo which will already have the base project setup required.
If you already have a Kafka instance running, you can skip to this step.
Kafka Setup
Next, let's make sure you have an instance of Kafka up and running in your local environment. This will make it easy for us to iterate and check our changes.
Plan Setup
Create a file depending on which interface you want to use.
- Java:
src/main/java/io/github/datacatering/plan/MyAdvancedKafkaJavaPlan.java
- Scala:
src/main/scala/io/github/datacatering/plan/MyAdvancedKafkaPlan.scala
- YAML:
docker/data/custom/plan/my-kafka.yaml
In docker/data/custom/plan/my-kafka.yaml
:
- Click on
Connection
towards the top of the screen - For connection name, set to
my_kafka
- Click on
Select data source type..
and selectKafka
- Set URL as
localhost:9092
- Optionally, we could set a topic name but if you have more than 1 topic, you would have to create new connection for each topic
- Click on
Create
- You should see your connection
my_kafka
show underExisting connections
- Click on
Home
towards the top of the screen - Set plan name to
my_kafka_plan
- Set task name to
kafka_task
- Click on
Select data source..
and selectmy_kafka
This class defines where we need to define all of our configurations for generating data. There are helper variables and methods defined to make it simple and easy to use.
Connection Configuration
Within our class, we can start by defining the connection properties to connect to Kafka.
var accountTask = kafka(
"my_kafka", //name
"localhost:9092", //url
Map.of() //optional additional connection options
);
Additional options can be found here.
val accountTask = kafka(
"my_kafka", //name
"localhost:9092", //url
Map() //optional additional connection options
)
Additional options can be found here.
In docker/data/custom/application.conf
:
- We have already created the connection details in this step
Schema
Let's create a task for inserting data into the account-topic
that is already
defined underdocker/data/kafka/setup_kafka.sh
. This topic should already be setup for you if you followed this
step. We can check if the topic is set up already via the following command:
Trimming the connection details to work with the docker-compose Kafka, we have a base Kafka connection to define
the topic we will publish to. Let's define each field along with their corresponding data type. You will notice that
the text
fields do not have a data type defined. This is because the default data type is StringType
.
{
var kafkaTask = kafka("my_kafka", "kafkaserver:29092")
.topic("account-topic")
.fields(
field().name("key").sql("body.account_id"),
//field().name("partition").type(IntegerType.instance()), //can define message partition here
field().messageHeaders(
field().messageHeader("account-id", "body.account_id"),
field().messageHeader("updated", "body.details.updated_by-time")
)
).fields(
field().messageBody(
field().name("account_id").regex("ACC[0-9]{8}"),
field().name("year").type(IntegerType.instance()).min(2021).max(2023),
field().name("amount").type(DoubleType.instance()),
field().name("details")
.fields(
field().name("name").expression("#{Name.name}"),
field().name("first_txn_date").type(DateType.instance()).sql("ELEMENT_AT(SORT_ARRAY(body.transactions.txn_date), 1)"),
field().name("updated_by")
.fields(
field().name("user"),
field().name("time").type(TimestampType.instance())
)
),
field().name("transactions").type(ArrayType.instance())
.fields(
field().name("txn_date").type(DateType.instance()).min(Date.valueOf("2021-01-01")).max("2021-12-31"),
field().name("amount").type(DoubleType.instance())
)
)
)
}
val kafkaTask = kafka("my_kafka", "kafkaserver:29092")
.topic("account-topic")
.fields(
field.name("key").sql("body.account_id"),
//field.name("partition").type(IntegerType), can define partition here
field.messageHeaders(
field.messageHeader("account-id", "body.account_id"),
field.messageHeader("updated", "body.details.updated_by.time"),
)
)
.fields(
field.messageBody(
field.name("account_id").regex("ACC[0-9]{8}"),
field.name("year").`type`(IntegerType).min(2021).max(2023),
field.name("account_status").oneOf("open", "closed", "suspended", "pending"),
field.name("amount").`type`(DoubleType),
field.name("details").`type`(StructType)
.fields(
field.name("name").expression("#{Name.name}"),
field.name("first_txn_date").`type`(DateType).sql("ELEMENT_AT(SORT_ARRAY(body.transactions.txn_date), 1)"),
field.name("updated_by").`type`(StructType)
.fields(
field.name("user"),
field.name("time").`type`(TimestampType),
),
),
field.name("transactions").`type`(ArrayType)
.fields(
field.name("txn_date").`type`(DateType).min(Date.valueOf("2021-01-01")).max("2021-12-31"),
field.name("amount").`type`(DoubleType),
)
)
)
In docker/data/custom/task/kafka/kafka-task.yaml
:
name: "kafka_task"
steps:
- name: "kafka_account"
options:
topic: "account-topic"
fields:
- name: "key"
options:
sql: "body.account_id"
- name: "messageBody"
fields:
- name: "account_id"
- name: "year"
type: "int"
options:
min: "2021"
max: "2022"
- name: "amount"
type: "double"
options:
min: "10.0"
max: "100.0"
- name: "details"
fields:
- name: "name"
- name: "first_txn_date"
type: "date"
options:
sql: "ELEMENT_AT(SORT_ARRAY(body.transactions.txn_date), 1)"
- name: "updated_by"
fields:
- name: "user"
- name: "time"
type: "timestamp"
- name: "transactions"
type: "array"
fields:
- name: "txn_date"
type: "date"
- name: "amount"
type: "double"
- name: "messageHeaders"
fields:
- name: "account-id"
options:
sql: "body.account_id"
- name: "updated"
options:
sql: "body.details.update_by.time"
- Click on
Generation
and tick theManual
checkbox - Click on
+ Field
- Add name as
key
- Click on
Select data type
and selectstring
- Click
+
next to data type and selectSql
. Then enterbody.account_id
- Click on
+ Field
and add name asmessageBody
- Click on
Select data type
and selectstruct
- Click on
+ Field
undermessageBody
and add name asaccount_id
- Add additional fields under
messageBody
with your own metadata - Click on
+ Field
and add name asmessageHeaders
- Click on
Select data type
and selectstruct
- Click on
+ Field
undermessageHeaders
and add name asaccount_id
- Add additional fields under
messageHeaders
with your own metadata
- Add name as
Fields
The schema defined for Kafka has a format that needs to be followed as noted above. Specifically, the required fields are:
- messageBody
Whilst, the other fields are optional:
- key
- partition
- messageHeaders
Message Headers
If your messages contain headers, you can follow the details below on generating header values. These can be based off
values contained within you message body or could be static values, just like any other generated field. The main
restriction imposed here is that the key
of the message header is static and the value
has to be a valid SQL
expression.
In docker/data/custom/task/kafka/kafka-task.yaml
:
- Click on
+ Field
and add name asmessageHeaders
- Click on
Select data type
and selectstruct
- Click on
+ Field
undermessageHeaders
and add name asaccount_id
- Add additional fields under
messageHeaders
with your own metadata
transactions
transactions
is an array that contains an inner structure of txn_date
and amount
. The size of the array generated
can be controlled via arrayMinLength
and arrayMaxLength
.
In docker/data/custom/task/kafka/kafka-task.yaml
:
Warning
Inner field definition for array type is currently not supported from the UI. Will be added in the near future!
details
details
is another example of a nested schema structure where it also has a nested structure itself in updated_by
.
One thing to note here is the first_txn_date
field has a reference to the body.transactions
array where it will
sort the array by txn_date
and get the first element.
field().name("details")
.fields(
field().name("name").expression("#{Name.name}"),
field().name("first_txn_date").type(DateType.instance()).sql("ELEMENT_AT(SORT_ARRAY(body.transactions.txn_date), 1)"),
field().name("updated_by")
.fields(
field().name("user"),
field().name("time").type(TimestampType.instance())
)
)
In docker/data/custom/task/kafka/kafka-task.yaml
:
name: "kafka_task"
steps:
- name: "kafka_account"
options:
topic: "account-topic"
fields:
- name: "messageBody"
fields:
- name: "details"
fields:
- name: "name"
- name: "first_txn_date"
type: "date"
options:
sql: "ELEMENT_AT(SORT_ARRAY(body.transactions.txn_date), 1)"
- name: "updated_by"
fields:
- name: "user"
- name: "time"
type: "timestamp"
- Click on
+ Field
and add name asmessageBody
- Click on
Select data type
and selectstruct
- Click on
+ Field
undermessageBody
and add name asdetails
- Add additional fields under
details
with your own metadata
Additional Configurations
At the end of data generation, a report gets generated that summarises the actions it performed. We can control the output folder of that report via configurations.
In docker/data/custom/application.conf
:
- Click on
Advanced Configuration
towards the bottom of the screen - Click on
Folder
and enter/tmp/data-caterer/report
forGenerated Reports Folder Path
Run
Now we can run via the script ./run.sh
that is in the top level directory of the data-caterer-example
to run the class we just
created.
- Click the button
Execute
at the top - Progress updates will show in the bottom right corner
- Click on
History
at the top - Check for your plan name and see the result summary
- Click on
Report
on the right side to see more details of what was executed
Your output should look like this.
{"account_id":"ACC56292178","year":2022,"amount":18338.627721151555,"details":{"name":"Isaias Reilly","first_txn_date":"2021-01-22","updated_by":{"user":"FgYXbKDWdhHVc3","time":"2022-12-30T13:49:07.309Z"}},"transactions":[{"txn_date":"2021-01-22","amount":30556.52125487579},{"txn_date":"2021-10-29","amount":39372.302259554635},{"txn_date":"2021-10-29","amount":61887.31389495968}]}
{"account_id":"ACC37729457","year":2022,"amount":96885.31758764731,"details":{"name":"Randell Witting","first_txn_date":"2021-06-30","updated_by":{"user":"HCKYEBHN8AJ3TB","time":"2022-12-02T02:05:01.144Z"}},"transactions":[{"txn_date":"2021-06-30","amount":98042.09647765031},{"txn_date":"2021-10-06","amount":41191.43564742036},{"txn_date":"2021-11-16","amount":78852.08184809204},{"txn_date":"2021-10-09","amount":13747.157653571106}]}
{"account_id":"ACC23127317","year":2023,"amount":81164.49304198896,"details":{"name":"Jed Wisozk","updated_by":{"user":"9MBFZZ","time":"2023-07-12T05:56:52.397Z"}},"transactions":[]}
Also check the HTML report, found at docker/sample/report/index.html
, that gets generated to get an overview of what
was executed.