Generate Batch and Event Data
Creating a data generator for Kafka topic with matching records in a CSV file.
Requirements
- 5 minutes
- Git
- Gradle
- Docker
Get Started
First, we will clone the data-caterer-example repo which will already have the base project setup required.
git clone git@github.com:data-catering/data-caterer-example.git
git clone git@github.com:data-catering/data-caterer-example.git
git clone git@github.com:data-catering/data-caterer-example.git
Kafka Setup
If you don't have your own Kafka up and running, you can set up and run an instance configured in the docker
folder via.
cd docker
docker-compose up -d kafka
docker exec docker-kafkaserver-1 kafka-topics --bootstrap-server localhost:9092 --list
Let's create a task for inserting data into the account-topic
that is already defined
underdocker/data/kafka/setup_kafka.sh
.
Plan Setup
Create a new Java or Scala class.
- Java:
src/main/java/io/github/datacatering/plan/MyAdvancedBatchEventJavaPlanRun.java
- Scala:
src/main/scala/io/github/datacatering/plan/MyAdvancedBatchEventPlanRun.scala
Make sure your class extends PlanRun
.
import io.github.datacatering.datacaterer.java.api.PlanRun;
...
public class MyAdvancedBatchEventJavaPlanRun extends PlanRun {
{
var kafkaTask = new AdvancedKafkaJavaPlanRun().getKafkaTask();
}
}
import io.github.datacatering.datacaterer.api.PlanRun
...
class MyAdvancedBatchEventPlanRun extends PlanRun {
val kafkaTask = new AdvancedKafkaPlanRun().kafkaTask
}
We will borrow the Kafka task that is already defined under the class AdvancedKafkaPlanRun
or AdvancedKafkaJavaPlanRun
. You can go through the Kafka guide here for more details.
Schema
Let us set up the corresponding schema for the CSV file where we want to match the values that are generated for the Kafka messages.
var kafkaTask = new AdvancedKafkaJavaPlanRun().getKafkaTask();
var csvTask = csv("my_csv", "/opt/app/data/csv/account")
.schema(
field().name("account_number"),
field().name("year"),
field().name("name"),
field().name("payload")
);
val kafkaTask = new AdvancedKafkaPlanRun().kafkaTask
val csvTask = csv("my_csv", "/opt/app/data/csv/account")
.schema(
field.name("account_number"),
field.name("year"),
field.name("name"),
field.name("payload")
)
This is a simple schema where we want to use the values and metadata that is already defined in the kafkaTask
to
determine what the data will look like for the CSV file. Even if we defined some metadata here, it would be overridden
when we define our foreign key relationships.
Foreign Keys
From the above CSV schema, we see note the following against the Kafka schema:
account_number
in CSV needs to match with theaccount_id
in Kafka- We see that
account_id
is referred to in thekey
column asfield.name("key").sql("content.account_id")
- We see that
year
needs to match withcontent.year
in Kafka, which is a nested field- We can only do foreign key relationships with top level fields, not nested fields. So we define a new column
called
tmp_year
which will not appear in the final output for the Kafka messages but is used as an intermediate stepfield.name("tmp_year").sql("content.year").omit(true)
- We can only do foreign key relationships with top level fields, not nested fields. So we define a new column
called
name
needs to match withcontent.details.name
in Kafka, also a nested field- Using the same logic as above, we define a temporary column called
tmp_name
which will take the value of the nested field but will be omittedfield.name("tmp_name").sql("content.details.name").omit(true)
- Using the same logic as above, we define a temporary column called
payload
represents the whole JSON message sent to Kafka, which matches tovalue
column
Our foreign keys are therefore defined like below. Order is important when defining the list of columns. The index needs to match with the corresponding column in the other data source.
var myPlan = plan().addForeignKeyRelationship(
kafkaTask, List.of("key", "tmp_year", "tmp_name", "value"),
List.of(Map.entry(csvTask, List.of("account_number", "year", "name", "payload")))
);
var conf = configuration()
.generatedReportsFolderPath("/opt/app/data/report");
execute(myPlan, conf, kafkaTask, csvTask);
val myPlan = plan.addForeignKeyRelationship(
kafkaTask, List("key", "tmp_year", "tmp_name", "value"),
List(csvTask -> List("account_number", "year", "name", "payload"))
)
val conf = configuration.generatedReportsFolderPath("/opt/app/data/report")
execute(myPlan, conf, kafkaTask, csvTask)
Run
Let's try run.
cd ..
./run.sh
#input class MyAdvancedBatchEventJavaPlanRun or MyAdvancedBatchEventPlanRun
#after completing
docker exec docker-kafkaserver-1 kafka-console-consumer --bootstrap-server localhost:9092 --topic account-topic --from-beginning
It should look something like this.
{"account_id":"ACC03093143","year":2023,"amount":87990.37196728592,"details":{"name":"Nadine Heidenreich Jr.","first_txn_date":"2021-11-09","updated_by":{"user":"YfEyJCe8ohrl0j IfyT","time":"2022-09-26T20:47:53.404Z"}},"transactions":[{"txn_date":"2021-11-09","amount":97073.7914706189}]}
{"account_id":"ACC08764544","year":2021,"amount":28675.58758765888,"details":{"name":"Delila Beer","first_txn_date":"2021-05-19","updated_by":{"user":"IzB5ksXu","time":"2023-01-26T20:47:26.389Z"}},"transactions":[{"txn_date":"2021-10-01","amount":80995.23818711648},{"txn_date":"2021-05-19","amount":92572.40049217848},{"txn_date":"2021-12-11","amount":99398.79832225188}]}
{"account_id":"ACC62505420","year":2023,"amount":96125.3125884202,"details":{"name":"Shawn Goodwin","updated_by":{"user":"F3dqIvYp2pFtena4","time":"2023-02-11T04:38:29.832Z"}},"transactions":[]}
Let's also check if there is a corresponding record in the CSV file.
$ cat docker/sample/csv/account/part-0000* | grep ACC03093143
ACC03093143,2023,Nadine Heidenreich Jr.,"{\"account_id\":\"ACC03093143\",\"year\":2023,\"amount\":87990.37196728592,\"details\":{\"name\":\"Nadine Heidenreich Jr.\",\"first_txn_date\":\"2021-11-09\",\"updated_by\":{\"user\":\"YfEyJCe8ohrl0j IfyT\",\"time\":\"2022-09-26T20:47:53.404Z\"}},\"transactions\":[{\"txn_date\":\"2021-11-09\",\"amount\":97073.7914706189}]}"
Great! The account, year, name and payload look to all match up.
Additional Topics
Order of execution
You may notice that the events are generated first, then the CSV file. This is because as part of the execute
function, we passed in the kafkaTask
first, before the csvTask
. You can change the order of execution by
passing in csvTask
before kafkaTask
into the execute
function.