Say Goodbye to Slow and Complex Integration Tests
Automate end-to-end data tests for any job or application
Generate
Generate production-like data to test your jobs or applications. Create data in files, databases, HTTP APIs and messaging systems.
Generate data in existing Postgres database
You have existing tables in Postgres and you want to generate data for them whilst maintaining relationships between tables.
- Click on
Connection
tab and add your Postgres conneciton - Go back to
Home
tab andSelect data source
as your Postgres connection - Click on
Generate
and selectAuto
- Click on
Execute
to generate data
Sample
SELECT * FROM account.accounts LIMIT 1;
account_number | account_status | created_by | created_by | open_timestamp
---------------+----------------+---------------------+------------+------------------------
0499572486 | closed | Stewart Hartmann | eod | 2023-12-02 12:30:37.602
SELECT * FROM account.balances where account_number='0499572486';
account_number | balance | update_timestamp
---------------+-----------+------------------------
0499572486 | 285293.23 | 2024-01-30 03:30:29.012
SELECT * FROM account.transactions where account_number='0499572486';
account_number | amount | create_timestamp
---------------+----------+------------------------
0499572486 | 1893.46 | 2024-03-13 18:05:45.565
Create, get and delete pets via HTTP API using the same id
First, generate data for creating pets via POST, then get pets via GET and finally delete pets via DELETE, all using
the same id
.
val httpTask = http("my_http")
.fields(metadataSource.openApi("/opt/app/http/petstore.json"))
.count(count.records(10))
val myPlan = plan.addForeignKeyRelationship(
foreignField("my_http", "POST/pets", "body.id"),
foreignField("my_http", "GET/pets/{id}", "pathParamid"),
foreignField("my_http", "DELETE/pets/{id}", "pathParamid")
)
var httpTask = http("my_http")
.fields(metadataSource().openApi("/opt/app/http/petstore.json"))
.count(count().records(10));
var myPlan = plan().addForeignKeyRelationship(
foreignField("my_http", "POST/pets", "body.id"),
foreignField("my_http", "GET/pets/{id}", "pathParamid"),
foreignField("my_http", "DELETE/pets/{id}", "pathParamid")
);
---
name: "http_openapi"
steps:
- name: "pets"
count:
records: 10
options:
metadataSourceType: "openApi"
schemaLocation: "/opt/app/http/petstore.json"
---
name: "http_plan"
tasks:
- name: "http_openapi"
dataSourceName: "http"
sinkOptions:
foreignKeys:
- source:
dataSource: "http"
step: "POST/pets"
fields: ["body.id"]
generate:
- dataSource: "http"
step: "GET/pets/{id}"
fields: ["pathParamid"]
- dataSource: "http"
step: "DELETE/pets/{id}"
fields: ["pathParamid"]
- Click on
Connection
tab, add your OpenAPI/Swagger connection to file and add HTTP connection - Go back to
Home
tab andSelect data source
as your HTTP connection - Click on
Generate
, selectAuto with metadata source
and then select your OpenAPI/Swagger connection - Go to
Relationships
and click on+ Relationship
- For Source, select your task name, field as
body.id
, method asPOST
and endpoint as/pets
- Click on
Generation
and+ Link
, select your task name, field aspathParamid
, method asGET
and endpoint as/pets/{id}
- Click on
+ Link
, select your task name, field aspathParamid
, method asDELETE
and endpoint as/pets/{id}
- For Source, select your task name, field as
- Click on
Advanced Configuration
, openFlag
and enableGenerate Plan And Tasks
- Click on
Execute
to generate data
Populate Kafka topic with account events
Create fresh data in your Kafka topics for account events with nested structures.
val kafkaTask = kafka("my_kafka", "localhost:9092")
.topic("accounts")
.fields(field.name("key").sql("body.account_id"))
.fields(
field.messageBody(
field.name("account_id").regex("ACC[0-9]{8}"),
field.name("account_status").oneOf("open", "closed", "suspended", "pending"),
field.name("balance").`type`(DoubleType).round(2),
field.name("details")
.fields(
field.name("name").expression("#{Name.name}"),
field.name("open_date").`type`(DateType).min(LocalDate.now())
)
)
)
var kafkaTask = kafka("my_kafka", "localhost:9092")
.topic("accounts")
.fields(field().name("key").sql("body.account_id"))
.fields(
field().messageBody(
field().name("account_id").regex("ACC[0-9]{8}"),
field().name("account_status").oneOf("open", "closed", "suspended", "pending"),
field().name("balance").type(DoubleType.instance()).round(2),
field().name("details")
.fields(
field().name("name").expression("#{Name.name}"),
field().name("open_date").type(DateType.instance()).min(LocalDate.now())
)
)
)
---
name: "simple_kafka"
steps:
- name: "kafka_account"
type: "kafka"
options:
topic: "accounts"
fields:
- name: "key"
type: "string"
options:
sql: "body.account_id"
- name: "messageBody"
type: struct
fields:
- name: "account_id"
options:
regex: "ACC[0-9]{8}"
- name: "account_status"
options:
oneOf: ["open", "closed", "suspended", "pending"]
- name: "balance"
type: "double"
options:
round: 2
- name: "details"
type: struct
fields:
- name: "name"
- name: "open_date"
type: "date"
options:
min: "now()"
- Click on
Connection
tab, add your Kafka connection - Go back to
Home
tab,Select data source
as your Kafka connection and put topic asaccounts
- Click on
Generate
and selectManual
checkbox- Click on
+ Field
, add namekey
with typestring
- Click on
+
, selectSQL
and enterbody.account_id
- Click on
- Click on
+ Field
, add namemessageBody
with typestruct
- Click on inner
+ Field
, add nameaccount_id
with typestring
- Click on
+
, selectRegex
and enterACC[0-9]{8}
- Click on inner
+ Field
, add nameaccount_status
with typestring
- Click on
+
, selectOne Of
and enteropen, closed, suspended, pending
- Click on inner
+ Field
, add namebalance
with typedouble
- Click on
+
, selectRound
and enter2
- Click on inner
+ Field
, add namedetails
with typestruct
- Click on inner
+ Field
, add namename
with typestring
- Click on inner
+ Field
, add nameopen_date
with typedate
- Click on
+
, selectMin
and enternow()
- Click on
- Click on inner
- Click on inner
- Click on
- Click on
Execute
to generate data
Sample
[
{
"account_id":"ACC35554421",
"account_status":"open",
"balance":89139.62,
"details":{
"name":"Jonie Farrell",
"open_date":"2025-01-15"
}
},
{
"account_id":"ACC30149813",
"account_status":"closed",
"balance":28861.09,
"details":{
"name":"Debrah Douglas",
"open_date":"2025-01-17"
}
},
{
"account_id":"ACC58341320",
"account_status":"pending",
"balance":57543.91,
"details":{
"name":"Elmer Lind",
"open_date":"2025-01-20"
}
}
]
And Validate
Ensure your job or service is working as expected before going to production by generating data, ingesting it and then validating the downstream data sources have the correct information.
Check all generated records from CSV exist in Iceberg
Run data generation for CSV file (based on schema from data contract), consume it from your job (that produces an Iceberg table) and then validate it.
val csvTask = csv("csv_accounts", "/data/csv/customer/account", Map("header" -> "true"))
.fields(metadataSource.openDataContractStandard("/opt/app/mount/odcs/full-example.odcs.yaml"))
val icebergTask = iceberg("iceberg_accounts", "dev.accounts", "/data/iceberg/customer/account")
.validations(
validation.unique("account_id"),
validation.groupBy("account_id").sum("balance").greaterThan(0),
validation.field("open_time").isIncreasing(),
validation.count().isEqual(1000)
)
.validationWait(waitCondition.file("/data/iceberg/customer/account"))
var csvTask = csv("csv_accounts", "/data/csv/customer/account", Map.of("header", "true"))
.fields(metadataSource().openDataContractStandard("/opt/app/mount/odcs/full-example.odcs.yaml"));
var icebergTask = iceberg("iceberg_accounts", "dev.accounts", "/data/iceberg/customer/account")
.validations(
validation().unique("account_id"),
validation().groupBy("account_id").sum("balance").greaterThan(0),
validation().field("open_time").isIncreasing(),
validation().count().isEqual(1000)
)
.validationWait(waitCondition().file("/data/iceberg/customer/account"));
---
name: "csv_accounts"
steps:
- name: "accounts"
type: "csv"
options:
path: "/data/csv/customer/account"
metadataSourceType: "openDataContractStandard"
dataContractFile: "/opt/app/mount/odcs/full-example.odcs.yaml"
---
name: "iceberg_account_checks"
dataSources:
iceberg:
- options:
path: "/data/iceberg/customer/account"
validations:
- field: "account_id"
validation:
- type: "unique"
- field: "open_time"
validation:
- type: "isIncreasing"
- groupByFields: [ "account_id" ]
aggType: "sum"
aggExpr: "sum(balance) > 0"
- aggType: "count"
aggExpr: "count == 1000"
- Click on
Connection
tab, add your CSV, Iceberg and ODCS (Open Data Contract Standard) connection - Go back to
Home
tab, enter task name ascsv_accounts
andSelect data source
as your CSV connection - Click on
Generate
and selectAuto from metadata source
checkbox- Select your ODCS connection as the metadata source
- Click on
+ Task
, selectIceberg
and select your Iceberg connection- Click on
+ Validation
, selectField
, enteraccount_id
and selectUnique
- Click on
+ Validation
, selectGroup By
and enteraccount_id
- Click on
+
, selectSum
and enterbalance > 0
- Click on
- Click on
+ Validation
, selectField
and enteropen_time
- Click on
+
, selectIs Increasing
- Click on
- Click on
+ Validation
, selectGroup By
and enteraccount_id
- Click on
+
, selectCount
, click on+
next to count, selectEqual
and enter1000
- Click on
- Click on
Execute
to generate data
Use validations from Great Expectations
If you have existing data quality rules from an external source like Great Expectations, you can use them to validate your data without rewriting them as part of your tests.
- Click on
Connection
tab, select data source type asGreat Expecations
- Enter
Expectations file
as/opt/app/mount/ge/taxi-expectations.json
- Enter
- Click on
Home
tab,Select data source
as your JSON connection - Open
Validation
and select checkboxAuto from metadata source
- Select your Great Expectations connection as the metadata source
- Click on
Execute
to generate data
Complex validations based on pre-conditions or upstream data
- Check
balance
is0
whenstatus
isclosed
- Check
open_time
is the same in CSV and Iceberg - Check sum of
amount
in Iceberg is the same asbalance
in CSV for eachaccount_id
val icebergTask = iceberg("iceberg_accounts", "dev.accounts", "/data/iceberg/customer/account")
.validations(
validation.preFilter(validation.field("status").isEqual("closed")).field("balance").isEqual(0),
validation.upstreamData(accountTask)
.joinFields("account_id")
.validations(
validation.field("open_time").isEqualField("csv_accounts.open_time"),
validation.groupBy("account_id", "csv_accounts_balance").sum("amount").isEqualField("csv_accounts_balance")
)
)
var icebergTask = iceberg("iceberg_accounts", "dev.accounts", "/data/iceberg/customer/account")
.validations(
validation().preFilter(validation().field("status").isEqual("closed")).field("balance").isEqual(0),
validation().upstreamData(accountTask)
.joinFields("account_id")
.validations(
validation().field("open_time").isEqualField("csv_accounts.open_time"),
validation().groupBy("account_id", "csv_accounts_balance").sum("amount").isEqualField("csv_accounts_balance")
)
);
---
name: "iceberg_account_checks"
dataSources:
iceberg:
- options:
path: "/data/iceberg/customer/account"
validations:
- preFilterExpr: "status == 'closed'"
expr: "balance == 0"
- upstreamDataSource: "csv_accounts"
joinFields: ["account_id"]
validations:
- expr: "open_time == csv_accounts.open_time"
- groupByFields: ["account_id", "csv_accounts_balance"]
aggType: "sum"
aggExpr: "sum(amount) == csv_accounts_balance"
- Click on
+ Task
, selectIceberg
and select your Iceberg connection- Pre-filter is not available yet via UI but will be soon!
- Click on
+ Validation
, selectUpstream
and entercsv_accounts
- Click on
+
, selectJoin Field(s)
and enteraccount_id
- Click on
+ Validation
, selectField
and enteropen_time
- Click on
+
, selectEqual
and entercsv_accounts.open_time
- Click on
- Click on
+ Validation
, selectGroup By
and enteraccount_id, csv_accounts_balance
- Click on
+
, selectSum
and enteramount
- Click on
+
, selectEqual
and entercsv_accounts_balance
- Click on
- Click on
Execute
to generate data
Why use Data Caterer
- Catch bugs before production: Bring stability to your data pipelines
- Speed up your development cycles: Fast feedback testing locally and in test environments
- Single tool for all data sources: No custom scripts needed
- No production data or connection required: Secure first approach, fully metadata driven
- Easy to use for testers and developers: Use either UI, Java, Scala or YAML
- Simulate complex data flows: Maintain relationships across data sources
Main features
- Connect to any data source
- Auto generate production-like data from data connections or metadata sources
- Relationships across data sources
- Validate based on data generated
- Clean up generated and downstream data
What it is
-
Test data management tool
Generate synthetic production-like data to be consumed and validated. Clean up the data after using to keep your environments clean.
-
Run locally and in test environments
Fast feedback loop for developers and testers to ensure the data is correct before going to production.
-
Designed for any data source
Support for pushing data to any data source, in any format, batch or real-time.
-
High/Low/No code solution
Use the tool via either UI, Java, Scala or YAML.
-
Developer productivity tool
If you are a new developer or seasoned veteran, cut down on your feedback loop when developing with data.
Who can use it
Type | Interface | User |
---|---|---|
No Code | UI | QA, Testers, Data Scientist, Analyst |
Low Code | YAML | DevOps, Kubernetes Fans |
High Code | Java/Scala | Software Developers, Data Engineers |