Say Goodbye to Slow and Complex Integration Tests
Automate end-to-end data tests for any job or application
Generate
Generate production-like data to test your jobs or applications. Create data in files, databases, HTTP APIs and messaging systems.
Generate data in existing Postgres database
You have existing tables in Postgres and you want to generate data for them whilst maintaining relationships between tables.
- Click on
Connectiontab and add your Postgres conneciton - Go back to
Hometab andSelect data sourceas your Postgres connection - Click on
Generateand selectAuto - Click on
Executeto generate data
Sample
SELECT * FROM account.accounts LIMIT 1;
account_number | account_status | created_by | created_by | open_timestamp
---------------+----------------+---------------------+------------+------------------------
0499572486 | closed | Stewart Hartmann | eod | 2023-12-02 12:30:37.602
SELECT * FROM account.balances where account_number='0499572486';
account_number | balance | update_timestamp
---------------+-----------+------------------------
0499572486 | 285293.23 | 2024-01-30 03:30:29.012
SELECT * FROM account.transactions where account_number='0499572486';
account_number | amount | create_timestamp
---------------+----------+------------------------
0499572486 | 1893.46 | 2024-03-13 18:05:45.565
Create, get and delete pets via HTTP API using the same id
First, generate data for creating pets via POST, then get pets via GET and finally delete pets via DELETE, all using
the same id.
val httpTask = http("my_http")
.fields(metadataSource.openApi("/opt/app/http/petstore.json"))
.count(count.records(10))
val myPlan = plan.addForeignKeyRelationship(
foreignField("my_http", "POST/pets", "body.id"),
foreignField("my_http", "GET/pets/{id}", "pathParamid"),
foreignField("my_http", "DELETE/pets/{id}", "pathParamid")
)
var httpTask = http("my_http")
.fields(metadataSource().openApi("/opt/app/http/petstore.json"))
.count(count().records(10));
var myPlan = plan().addForeignKeyRelationship(
foreignField("my_http", "POST/pets", "body.id"),
foreignField("my_http", "GET/pets/{id}", "pathParamid"),
foreignField("my_http", "DELETE/pets/{id}", "pathParamid")
);
---
name: "http_openapi"
steps:
- name: "pets"
count:
records: 10
options:
metadataSourceType: "openApi"
schemaLocation: "/opt/app/http/petstore.json"
---
name: "http_plan"
tasks:
- name: "http_openapi"
dataSourceName: "http"
sinkOptions:
foreignKeys:
- source:
dataSource: "http"
step: "POST/pets"
fields: ["body.id"]
generate:
- dataSource: "http"
step: "GET/pets/{id}"
fields: ["pathParamid"]
- dataSource: "http"
step: "DELETE/pets/{id}"
fields: ["pathParamid"]
- Click on
Connectiontab, add your OpenAPI/Swagger connection to file and add HTTP connection - Go back to
Hometab andSelect data sourceas your HTTP connection - Click on
Generate, selectAuto with metadata sourceand then select your OpenAPI/Swagger connection - Go to
Relationshipsand click on+ Relationship- For Source, select your task name, field as
body.id, method asPOSTand endpoint as/pets - Click on
Generationand+ Link, select your task name, field aspathParamid, method asGETand endpoint as/pets/{id} - Click on
+ Link, select your task name, field aspathParamid, method asDELETEand endpoint as/pets/{id}
- For Source, select your task name, field as
- Click on
Advanced Configuration, openFlagand enableGenerate Plan And Tasks - Click on
Executeto generate data
Populate Kafka topic with account events
Create fresh data in your Kafka topics for account events with nested structures.
val kafkaTask = kafka("my_kafka", "localhost:9092")
.topic("accounts")
.fields(field.name("key").sql("body.account_id"))
.fields(
field.messageBody(
field.name("account_id").regex("ACC[0-9]{8}"),
field.name("account_status").oneOf("open", "closed", "suspended", "pending"),
field.name("balance").`type`(DoubleType).round(2),
field.name("details")
.fields(
field.name("name").expression("#{Name.name}"),
field.name("open_date").`type`(DateType).min(LocalDate.now())
)
)
)
var kafkaTask = kafka("my_kafka", "localhost:9092")
.topic("accounts")
.fields(field().name("key").sql("body.account_id"))
.fields(
field().messageBody(
field().name("account_id").regex("ACC[0-9]{8}"),
field().name("account_status").oneOf("open", "closed", "suspended", "pending"),
field().name("balance").type(DoubleType.instance()).round(2),
field().name("details")
.fields(
field().name("name").expression("#{Name.name}"),
field().name("open_date").type(DateType.instance()).min(LocalDate.now())
)
)
)
---
name: "simple_kafka"
steps:
- name: "kafka_account"
type: "kafka"
options:
topic: "accounts"
fields:
- name: "key"
type: "string"
options:
sql: "body.account_id"
- name: "messageBody"
type: struct
fields:
- name: "account_id"
options:
regex: "ACC[0-9]{8}"
- name: "account_status"
options:
oneOf: ["open", "closed", "suspended", "pending"]
- name: "balance"
type: "double"
options:
round: 2
- name: "details"
type: struct
fields:
- name: "name"
- name: "open_date"
type: "date"
options:
min: "now()"
- Click on
Connectiontab, add your Kafka connection - Go back to
Hometab,Select data sourceas your Kafka connection and put topic asaccounts - Click on
Generateand selectManualcheckbox- Click on
+ Field, add namekeywith typestring- Click on
+, selectSQLand enterbody.account_id
- Click on
- Click on
+ Field, add namemessageBodywith typestruct- Click on inner
+ Field, add nameaccount_idwith typestring - Click on
+, selectRegexand enterACC[0-9]{8} - Click on inner
+ Field, add nameaccount_statuswith typestring - Click on
+, selectOne Ofand enteropen, closed, suspended, pending - Click on inner
+ Field, add namebalancewith typedouble - Click on
+, selectRoundand enter2 - Click on inner
+ Field, add namedetailswith typestruct- Click on inner
+ Field, add namenamewith typestring - Click on inner
+ Field, add nameopen_datewith typedate- Click on
+, selectMinand enternow()
- Click on
- Click on inner
- Click on inner
- Click on
- Click on
Executeto generate data
Sample
[
{
"account_id":"ACC35554421",
"account_status":"open",
"balance":89139.62,
"details":{
"name":"Jonie Farrell",
"open_date":"2025-01-15"
}
},
{
"account_id":"ACC30149813",
"account_status":"closed",
"balance":28861.09,
"details":{
"name":"Debrah Douglas",
"open_date":"2025-01-17"
}
},
{
"account_id":"ACC58341320",
"account_status":"pending",
"balance":57543.91,
"details":{
"name":"Elmer Lind",
"open_date":"2025-01-20"
}
}
]
And Validate
Ensure your job or service is working as expected before going to production by generating data, ingesting it and then validating the downstream data sources have the correct information.
Check all generated records from CSV exist in Iceberg
Run data generation for CSV file (based on schema from data contract), consume it from your job (that produces an Iceberg table) and then validate it.
val csvTask = csv("csv_accounts", "/data/csv/customer/account", Map("header" -> "true"))
.fields(metadataSource.openDataContractStandard("/opt/app/mount/odcs/full-example.odcs.yaml"))
val icebergTask = iceberg("iceberg_accounts", "dev.accounts", "/data/iceberg/customer/account")
.validations(
validation.unique("account_id"),
validation.groupBy("account_id").sum("balance").greaterThan(0),
validation.field("open_time").isIncreasing(),
validation.count().isEqual(1000)
)
.validationWait(waitCondition.file("/data/iceberg/customer/account"))
var csvTask = csv("csv_accounts", "/data/csv/customer/account", Map.of("header", "true"))
.fields(metadataSource().openDataContractStandard("/opt/app/mount/odcs/full-example.odcs.yaml"));
var icebergTask = iceberg("iceberg_accounts", "dev.accounts", "/data/iceberg/customer/account")
.validations(
validation().unique("account_id"),
validation().groupBy("account_id").sum("balance").greaterThan(0),
validation().field("open_time").isIncreasing(),
validation().count().isEqual(1000)
)
.validationWait(waitCondition().file("/data/iceberg/customer/account"));
---
name: "csv_accounts"
steps:
- name: "accounts"
type: "csv"
options:
path: "/data/csv/customer/account"
metadataSourceType: "openDataContractStandard"
dataContractFile: "/opt/app/mount/odcs/full-example.odcs.yaml"
---
name: "iceberg_account_checks"
dataSources:
iceberg:
- options:
path: "/data/iceberg/customer/account"
validations:
- field: "account_id"
validation:
- type: "unique"
- field: "open_time"
validation:
- type: "isIncreasing"
- groupByFields: [ "account_id" ]
aggType: "sum"
aggExpr: "sum(balance) > 0"
- aggType: "count"
aggExpr: "count == 1000"
- Click on
Connectiontab, add your CSV, Iceberg and ODCS (Open Data Contract Standard) connection - Go back to
Hometab, enter task name ascsv_accountsandSelect data sourceas your CSV connection - Click on
Generateand selectAuto from metadata sourcecheckbox- Select your ODCS connection as the metadata source
- Click on
+ Task, selectIcebergand select your Iceberg connection- Click on
+ Validation, selectField, enteraccount_idand selectUnique - Click on
+ Validation, selectGroup Byand enteraccount_id- Click on
+, selectSumand enterbalance > 0
- Click on
- Click on
+ Validation, selectFieldand enteropen_time- Click on
+, selectIs Increasing
- Click on
- Click on
+ Validation, selectGroup Byand enteraccount_id - Click on
+, selectCount, click on+next to count, selectEqualand enter1000
- Click on
- Click on
Executeto generate data
Use validations from Great Expectations
If you have existing data quality rules from an external source like Great Expectations, you can use them to validate your data without rewriting them as part of your tests.
- Click on
Connectiontab, select data source type asGreat Expecations- Enter
Expectations fileas/opt/app/mount/ge/taxi-expectations.json
- Enter
- Click on
Hometab,Select data sourceas your JSON connection - Open
Validationand select checkboxAuto from metadata source- Select your Great Expectations connection as the metadata source
- Click on
Executeto generate data
Complex validations based on pre-conditions or upstream data
- Check
balanceis0whenstatusisclosed - Check
open_timeis the same in CSV and Iceberg - Check sum of
amountin Iceberg is the same asbalancein CSV for eachaccount_id
val icebergTask = iceberg("iceberg_accounts", "dev.accounts", "/data/iceberg/customer/account")
.validations(
validation.preFilter(validation.field("status").isEqual("closed")).field("balance").isEqual(0),
validation.upstreamData(accountTask)
.joinFields("account_id")
.validations(
validation.field("open_time").isEqualField("csv_accounts.open_time"),
validation.groupBy("account_id", "csv_accounts_balance").sum("amount").isEqualField("csv_accounts_balance")
)
)
var icebergTask = iceberg("iceberg_accounts", "dev.accounts", "/data/iceberg/customer/account")
.validations(
validation().preFilter(validation().field("status").isEqual("closed")).field("balance").isEqual(0),
validation().upstreamData(accountTask)
.joinFields("account_id")
.validations(
validation().field("open_time").isEqualField("csv_accounts.open_time"),
validation().groupBy("account_id", "csv_accounts_balance").sum("amount").isEqualField("csv_accounts_balance")
)
);
---
name: "iceberg_account_checks"
dataSources:
iceberg:
- options:
path: "/data/iceberg/customer/account"
validations:
- preFilterExpr: "status == 'closed'"
expr: "balance == 0"
- upstreamDataSource: "csv_accounts"
joinFields: ["account_id"]
validations:
- expr: "open_time == csv_accounts.open_time"
- groupByFields: ["account_id", "csv_accounts_balance"]
aggType: "sum"
aggExpr: "sum(amount) == csv_accounts_balance"
- Click on
+ Task, selectIcebergand select your Iceberg connection- Pre-filter is not available yet via UI but will be soon!
- Click on
+ Validation, selectUpstreamand entercsv_accounts - Click on
+, selectJoin Field(s)and enteraccount_id - Click on
+ Validation, selectFieldand enteropen_time- Click on
+, selectEqualand entercsv_accounts.open_time
- Click on
- Click on
+ Validation, selectGroup Byand enteraccount_id, csv_accounts_balance- Click on
+, selectSumand enteramount - Click on
+, selectEqualand entercsv_accounts_balance
- Click on
- Click on
Executeto generate data
Why use Data Caterer
- Catch bugs before production: Bring stability to your data pipelines
- Speed up your development cycles: Fast feedback testing locally and in test environments
- Single tool for all data sources: No custom scripts needed
- No production data or connection required: Secure first approach, fully metadata driven
- Easy to use for testers and developers: Use either UI, Java, Scala or YAML
- Simulate complex data flows: Maintain relationships across data sources
Main features
- Connect to any data source
- Auto generate production-like data from data connections or metadata sources
- Relationships across data sources
- Validate based on data generated
- Clean up generated and downstream data
What it is
-
Test data management tool
Generate synthetic production-like data to be consumed and validated. Clean up the data after using to keep your environments clean.
-
Run locally and in test environments
Fast feedback loop for developers and testers to ensure the data is correct before going to production.
-
Designed for any data source
Support for pushing data to any data source, in any format, batch or real-time.
-
High/Low/No code solution
Use the tool via either UI, Java, Scala or YAML.
-
Developer productivity tool
If you are a new developer or seasoned veteran, cut down on your feedback loop when developing with data.
Who can use it
| Type | Interface | User |
|---|---|---|
| No Code | UI | QA, Testers, Data Scientist, Analyst |
| Low Code | YAML | DevOps, Kubernetes Fans |
| High Code | Java/Scala | Software Developers, Data Engineers |