Data Source Connections
Details of all the connection configuration supported can be found in the below subsections for each type of connection.
These configurations can be done via API or from configuration. Examples of both are shown for each data source below.
Supported Data Connections
Data Source Type | Data Source | Support |
---|---|---|
Cloud Storage | AWS S3 | |
Cloud Storage | Azure Blob Storage | |
Cloud Storage | GCP Cloud Storage | |
Database | Cassandra | |
Database | MySQL | |
Database | Postgres | |
Database | Elasticsearch | |
Database | MongoDB | |
Database | Opensearch | |
File | CSV | |
File | Delta Lake | |
File | Iceberg | |
File | JSON | |
File | ORC | |
File | Parquet | |
File | Hudi | |
HTTP | REST API | |
Messaging | Kafka | |
Messaging | Solace | |
Messaging | ActiveMQ | |
Messaging | Pulsar | |
Messaging | RabbitMQ | |
Metadata | Data Contract CLI | |
Metadata | Great Expectations | |
Metadata | Marquez | |
Metadata | OpenMetadata | |
Metadata | OpenAPI/Swagger | |
Metadata | Open Data Contract Standard (ODCS) | |
Metadata | Amundsen | |
Metadata | Datahub | |
Metadata | Solace Event Portal |
API
All connection details require a name. Depending on the data source, you can define additional options which may be used by the driver or connector for connecting to the data source.
Configuration file
All connection details follow the same pattern.
<connection format> {
<connection name> {
<key> = <value>
}
}
Overriding configuration
When defining a configuration value that can be defined by a system property or environment variable at runtime, you can define that via the following:
url = "localhost"
url = ${?POSTGRES_URL}
The above defines that if there is a system property or environment variable named POSTGRES_URL
, then that value will
be used for the url
, otherwise, it will default to localhost
.
Data sources
To find examples of a task for each type of data source, please check out this page.
File
Linked here is a list of generic options that can be included as part of your file data source configuration if required. Links to specific file type configurations can be found below.
CSV
csv("customer_transactions", "/data/customer/transaction")
csv("customer_transactions", "/data/customer/transaction")
In application.conf
:
csv {
customer_transactions {
path = "/data/customer/transaction"
path = ${?CSV_PATH}
}
}
Other available configuration for CSV can be found here
JSON
json("customer_transactions", "/data/customer/transaction")
json("customer_transactions", "/data/customer/transaction")
In application.conf
:
json {
customer_transactions {
path = "/data/customer/transaction"
path = ${?JSON_PATH}
}
}
Other available configuration for JSON can be found here
ORC
orc("customer_transactions", "/data/customer/transaction")
orc("customer_transactions", "/data/customer/transaction")
In application.conf
:
orc {
customer_transactions {
path = "/data/customer/transaction"
path = ${?ORC_PATH}
}
}
Other available configuration for ORC can be found here
Parquet
parquet("customer_transactions", "/data/customer/transaction")
parquet("customer_transactions", "/data/customer/transaction")
In application.conf
:
parquet {
customer_transactions {
path = "/data/customer/transaction"
path = ${?PARQUET_PATH}
}
}
Other available configuration for Parquet can be found here
Delta
delta("customer_transactions", "/data/customer/transaction")
delta("customer_transactions", "/data/customer/transaction")
In application.conf
:
delta {
customer_transactions {
path = "/data/customer/transaction"
path = ${?DELTA_PATH}
}
}
Iceberg
iceberg(
"customer_accounts", //name
"account.accounts", //table name
"/opt/app/data/customer/iceberg", //warehouse path
"hadoop", //catalog type
"", //catalogUri
Map.of() //additional options
);
iceberg(
"customer_accounts", //name
"account.accounts", //table name
"/opt/app/data/customer/iceberg", //warehouse path
"hadoop", //catalog type
"", //catalogUri
Map() //additional options
)
In application.conf
:
iceberg {
customer_transactions {
path = "/opt/app/data/customer/iceberg"
path = ${?ICEBERG_WAREHOUSE_PATH}
catalogType = "hadoop"
catalogType = ${?ICEBERG_CATALOG_TYPE}
catalogUri = ""
catalogUri = ${?ICEBERG_CATALOG_URI}
}
}
RMDBS
Follows the same configuration used by Spark as
found here.
Sample can be found below
postgres(
"customer_postgres", #name
"jdbc:postgresql://localhost:5432/customer", #url
"postgres", #username
"postgres" #password
)
postgres(
"customer_postgres", #name
"jdbc:postgresql://localhost:5432/customer", #url
"postgres", #username
"postgres" #password
)
In application.conf
:
jdbc {
customer_postgres {
url = "jdbc:postgresql://localhost:5432/customer"
url = ${?POSTGRES_URL}
user = "postgres"
user = ${?POSTGRES_USERNAME}
password = "postgres"
password = ${?POSTGRES_PASSWORD}
driver = "org.postgresql.Driver"
}
}
Ensure that the user has write permission, so it is able to save the table to the target tables.
SQL Permission Statements
GRANT INSERT ON <schema>.<table> TO <user>;
Postgres
Can see example API or Config definition for Postgres connection above.
Permissions
Following permissions are required when generating plan and tasks:
SQL Permission Statements
GRANT SELECT ON information_schema.tables TO < user >;
GRANT SELECT ON information_schema.columns TO < user >;
GRANT SELECT ON information_schema.key_column_usage TO < user >;
GRANT SELECT ON information_schema.table_constraints TO < user >;
GRANT SELECT ON information_schema.constraint_column_usage TO < user >;
MySQL
mysql(
"customer_mysql", #name
"jdbc:mysql://localhost:3306/customer", #url
"root", #username
"root" #password
)
mysql(
"customer_mysql", #name
"jdbc:mysql://localhost:3306/customer", #url
"root", #username
"root" #password
)
In application.conf
:
jdbc {
customer_mysql {
url = "jdbc:mysql://localhost:3306/customer"
user = "root"
password = "root"
driver = "com.mysql.cj.jdbc.Driver"
}
}
Permissions
Following permissions are required when generating plan and tasks:
SQL Permission Statements
GRANT SELECT ON information_schema.columns TO < user >;
GRANT SELECT ON information_schema.statistics TO < user >;
GRANT SELECT ON information_schema.key_column_usage TO < user >;
Cassandra
Follows same configuration as defined by the Spark Cassandra Connector as found here
cassandra(
"customer_cassandra", #name
"localhost:9042", #url
"cassandra", #username
"cassandra", #password
Map.of() #optional additional connection options
)
cassandra(
"customer_cassandra", #name
"localhost:9042", #url
"cassandra", #username
"cassandra", #password
Map() #optional additional connection options
)
In application.conf
:
org.apache.spark.sql.cassandra {
customer_cassandra {
spark.cassandra.connection.host = "localhost"
spark.cassandra.connection.host = ${?CASSANDRA_HOST}
spark.cassandra.connection.port = "9042"
spark.cassandra.connection.port = ${?CASSANDRA_PORT}
spark.cassandra.auth.username = "cassandra"
spark.cassandra.auth.username = ${?CASSANDRA_USERNAME}
spark.cassandra.auth.password = "cassandra"
spark.cassandra.auth.password = ${?CASSANDRA_PASSWORD}
}
}
Permissions
Ensure that the user has write permission, so it is able to save the table to the target tables.
CQL Permission Statements
GRANT INSERT ON <schema>.<table> TO <user>;
Following permissions are required when enabling configuration.enableGeneratePlanAndTasks(true)
as it will gather
metadata information about tables and columns from the below tables.
CQL Permission Statements
GRANT SELECT ON system_schema.tables TO <user>;
GRANT SELECT ON system_schema.columns TO <user>;
Kafka
Define your Kafka bootstrap server to connect and send generated data to corresponding topics. Topic gets set at a step
level.
Further details can be
found here
kafka(
"customer_kafka", #name
"localhost:9092" #url
)
kafka(
"customer_kafka", #name
"localhost:9092" #url
)
In application.conf
:
kafka {
customer_kafka {
kafka.bootstrap.servers = "localhost:9092"
kafka.bootstrap.servers = ${?KAFKA_BOOTSTRAP_SERVERS}
}
}
When defining your schema for pushing data to Kafka, it follows a specific top level schema.
An example can be
found here
. You can define the key, value, headers, partition or topic by following the linked schema.
JMS
Uses JNDI lookup to send messages to JMS queue. Ensure that the messaging system you are using has your queue/topic registered via JNDI otherwise a connection cannot be created.
solace(
"customer_solace", #name
"smf://localhost:55554", #url
"admin", #username
"admin", #password
"default", #vpn name
"/jms/cf/default", #connection factory
"com.solacesystems.jndi.SolJNDIInitialContextFactory" #initial context factory
)
solace(
"customer_solace", #name
"smf://localhost:55554", #url
"admin", #username
"admin", #password
"default", #vpn name
"/jms/cf/default", #connection factory
"com.solacesystems.jndi.SolJNDIInitialContextFactory" #initial context factory
)
In application.conf
:
jms {
customer_solace {
initialContextFactory = "com.solacesystems.jndi.SolJNDIInitialContextFactory"
connectionFactory = "/jms/cf/default"
url = "smf://localhost:55555"
url = ${?SOLACE_URL}
user = "admin"
user = ${?SOLACE_USER}
password = "admin"
password = ${?SOLACE_PASSWORD}
vpnName = "default"
vpnName = ${?SOLACE_VPN}
}
}
HTTP
Define any username and/or password needed for the HTTP requests.
The url is defined in the tasks to allow for generated data to be populated in the url.
http(
"customer_api", #name
"admin", #username
"admin" #password
)
http(
"customer_api", #name
"admin", #username
"admin" #password
)
In application.conf
:
http {
customer_api {
user = "admin"
user = ${?HTTP_USER}
password = "admin"
password = ${?HTTP_PASSWORD}
}
}