Generate Batch and Event Data
Creating a data generator for Kafka topic with matching records in a CSV file.
Requirements
- 5 minutes
- Git
- Gradle
- Docker
Get Started
First, we will clone the data-caterer repo and navigate to the example folder which will already have the base project setup required.
Kafka Setup
If you don't have your own Kafka up and running, you can set up and run an instance configured in the docker
folder via.
cd docker
docker-compose up -d kafka
docker exec docker-kafkaserver-1 kafka-topics --bootstrap-server localhost:9092 --list
Let's create a task for inserting data into the account-topic that is already defined
underdocker/data/kafka/setup_kafka.sh.
Plan Setup
Create a new Java or Scala class.
- Java: src/main/java/io/github/datacatering/plan/MyAdvancedBatchEventJavaPlanRun.java
- Scala: src/main/scala/io/github/datacatering/plan/MyAdvancedBatchEventPlanRun.scala
Make sure your class extends PlanRun.
We will borrow the Kafka task that is already defined under the class KafkaPlanRun
or KafkaJavaPlanRun. You can go through the Kafka guide here for more details.
Schema
Let us set up the corresponding schema for the CSV file where we want to match the values that are generated for the Kafka messages.
This is a simple schema where we want to use the values and metadata that is already defined in the kafkaTask to
determine what the data will look like for the CSV file. Even if we defined some metadata here, it would be overridden
when we define our foreign key relationships.
Foreign Keys
From the above CSV schema, we see note the following against the Kafka schema:
- account_numberin CSV needs to match with the- account_idin Kafka- We see that account_idis referred to in thekeyfield asfield.name("key").sql("content.account_id")
 
- We see that 
- yearneeds to match with- content.yearin Kafka, which is a nested field- We can only do foreign key relationships with top level fields, not nested fields. So we define a new field
  called tmp_yearwhich will not appear in the final output for the Kafka messages but is used as an intermediate stepfield.name("tmp_year").sql("content.year").omit(true)
 
- We can only do foreign key relationships with top level fields, not nested fields. So we define a new field
  called 
- nameneeds to match with- content.details.namein Kafka, also a nested field- Using the same logic as above, we define a temporary field called tmp_namewhich will take the value of the nested field but will be omittedfield.name("tmp_name").sql("content.details.name").omit(true)
 
- Using the same logic as above, we define a temporary field called 
- payloadrepresents the whole JSON message sent to Kafka, which matches to- valuefield
Our foreign keys are therefore defined like below. Order is important when defining the list of fields. The index needs to match with the corresponding field in the other data source.
var myPlan = plan().addForeignKeyRelationship(
        kafkaTask, List.of("key", "tmp_year", "tmp_name", "value"),
        List.of(Map.entry(csvTask, List.of("account_number", "year", "name", "payload")))
);
var conf = configuration()
      .generatedReportsFolderPath("/opt/app/data/report");
execute(myPlan, conf, kafkaTask, csvTask);
Update docker/data/custom/plan/batch-event-plan.yaml:
name: "batch_event_plan"
description: "Generate batch and event data"
tasks:
  - name: "kafka_task"
    dataSourceName: "my_kafka"
  - name: "csv_task"
    dataSourceName: "my_csv"
sinkOptions:
  foreignKeys:
    - source:
        dataSource: "my_kafka"
        step: "account_topic"
        fields: ["key", "tmp_year", "tmp_name", "value"]
      generate:
        - dataSource: "my_csv"
          step: "csv_accounts"
          fields: ["account_number", "year", "name", "payload"]
Create file docker/data/custom/application.conf:
Run
Let's try run.
cd ..
./run.sh
#input class MyAdvancedBatchEventJavaPlanRun or MyAdvancedBatchEventPlanRun
#after completing
docker exec docker-kafkaserver-1 kafka-console-consumer --bootstrap-server localhost:9092 --topic account-topic --from-beginning
It should look something like this.
{"account_id":"ACC03093143","year":2023,"amount":87990.37196728592,"details":{"name":"Nadine Heidenreich Jr.","first_txn_date":"2021-11-09","updated_by":{"user":"YfEyJCe8ohrl0j IfyT","time":"2022-09-26T20:47:53.404Z"}},"transactions":[{"txn_date":"2021-11-09","amount":97073.7914706189}]}
{"account_id":"ACC08764544","year":2021,"amount":28675.58758765888,"details":{"name":"Delila Beer","first_txn_date":"2021-05-19","updated_by":{"user":"IzB5ksXu","time":"2023-01-26T20:47:26.389Z"}},"transactions":[{"txn_date":"2021-10-01","amount":80995.23818711648},{"txn_date":"2021-05-19","amount":92572.40049217848},{"txn_date":"2021-12-11","amount":99398.79832225188}]}
{"account_id":"ACC62505420","year":2023,"amount":96125.3125884202,"details":{"name":"Shawn Goodwin","updated_by":{"user":"F3dqIvYp2pFtena4","time":"2023-02-11T04:38:29.832Z"}},"transactions":[]}
Let's also check if there is a corresponding record in the CSV file.
$ cat docker/sample/csv/account/part-0000* | grep ACC03093143
ACC03093143,2023,Nadine Heidenreich Jr.,"{\"account_id\":\"ACC03093143\",\"year\":2023,\"amount\":87990.37196728592,\"details\":{\"name\":\"Nadine Heidenreich Jr.\",\"first_txn_date\":\"2021-11-09\",\"updated_by\":{\"user\":\"YfEyJCe8ohrl0j IfyT\",\"time\":\"2022-09-26T20:47:53.404Z\"}},\"transactions\":[{\"txn_date\":\"2021-11-09\",\"amount\":97073.7914706189}]}"
Great! The account, year, name and payload look to all match up.
Additional Topics
Order of execution
You may notice that the events are generated first, then the CSV file. This is because as part of the execute
function, we passed in the kafkaTask first, before the csvTask. You can change the order of execution by
passing in csvTask before kafkaTask into the execute function.