Skip to content

Data Source Connections

Details of all the connection configuration supported can be found in the below subsections for each type of connection.

These configurations can be done via API or from configuration. Examples of both are shown for each data source below.

Supported Data Connections

Data Source Type Data Source Support
Cloud Storage AWS S3 ✅
Cloud Storage Azure Blob Storage ✅
Cloud Storage GCP Cloud Storage ✅
Database Cassandra ✅
Database MySQL ✅
Database Postgres ✅
Database Elasticsearch
Database MongoDB
Database Opensearch
File CSV ✅
File Delta Lake ✅
File Iceberg ✅
File JSON ✅
File ORC ✅
File Parquet ✅
File Hudi
HTTP REST API ✅
Messaging Kafka ✅
Messaging Solace ✅
Messaging ActiveMQ
Messaging Pulsar
Messaging RabbitMQ
Metadata Data Contract CLI ✅
Metadata Great Expectations ✅
Metadata Marquez ✅
Metadata OpenMetadata ✅
Metadata OpenAPI/Swagger ✅
Metadata Open Data Contract Standard (ODCS) ✅
Metadata Amundsen
Metadata Datahub
Metadata Solace Event Portal

API

All connection details require a name. Depending on the data source, you can define additional options which may be used by the driver or connector for connecting to the data source.

Configuration file

All connection details follow the same pattern.

<connection format> {
    <connection name> {
        <key> = <value>
    }
}

Overriding configuration

When defining a configuration value that can be defined by a system property or environment variable at runtime, you can define that via the following:

url = "localhost"
url = ${?POSTGRES_URL}

The above defines that if there is a system property or environment variable named POSTGRES_URL, then that value will be used for the url, otherwise, it will default to localhost.

Data sources

To find examples of a task for each type of data source, please check out this page.

File

Linked here is a list of generic options that can be included as part of your file data source configuration if required. Links to specific file type configurations can be found below.

CSV

csv("customer_transactions", "/data/customer/transaction")
csv("customer_transactions", "/data/customer/transaction")

In application.conf:

csv {
  customer_transactions {
    path = "/data/customer/transaction"
    path = ${?CSV_PATH}
  }
}

Other available configuration for CSV can be found here

JSON

json("customer_transactions", "/data/customer/transaction")
json("customer_transactions", "/data/customer/transaction")

In application.conf:

json {
  customer_transactions {
    path = "/data/customer/transaction"
    path = ${?JSON_PATH}
  }
}

Other available configuration for JSON can be found here

ORC

orc("customer_transactions", "/data/customer/transaction")
orc("customer_transactions", "/data/customer/transaction")

In application.conf:

orc {
  customer_transactions {
    path = "/data/customer/transaction"
    path = ${?ORC_PATH}
  }
}

Other available configuration for ORC can be found here

Parquet

parquet("customer_transactions", "/data/customer/transaction")
parquet("customer_transactions", "/data/customer/transaction")

In application.conf:

parquet {
  customer_transactions {
    path = "/data/customer/transaction"
    path = ${?PARQUET_PATH}
  }
}

Other available configuration for Parquet can be found here

Delta

delta("customer_transactions", "/data/customer/transaction")
delta("customer_transactions", "/data/customer/transaction")

In application.conf:

delta {
  customer_transactions {
    path = "/data/customer/transaction"
    path = ${?DELTA_PATH}
  }
}

Iceberg

iceberg(
  "customer_accounts",              //name
  "account.accounts",               //table name
  "/opt/app/data/customer/iceberg", //warehouse path
  "hadoop",                         //catalog type
  "",                               //catalogUri
  Map.of()                          //additional options
);
iceberg(
  "customer_accounts",              //name
  "account.accounts",               //table name
  "/opt/app/data/customer/iceberg", //warehouse path
  "hadoop",                         //catalog type
  "",                               //catalogUri
  Map()                             //additional options
)

In application.conf:

iceberg {
  customer_transactions {
    path = "/opt/app/data/customer/iceberg"
    path = ${?ICEBERG_WAREHOUSE_PATH}
    catalogType = "hadoop"
    catalogType = ${?ICEBERG_CATALOG_TYPE}
    catalogUri = ""
    catalogUri = ${?ICEBERG_CATALOG_URI}
  }
}

RMDBS

Follows the same configuration used by Spark as found here.
Sample can be found below

postgres(
    "customer_postgres",                            #name
    "jdbc:postgresql://localhost:5432/customer",    #url
    "postgres",                                     #username
    "postgres"                                      #password
)
postgres(
    "customer_postgres",                            #name
    "jdbc:postgresql://localhost:5432/customer",    #url
    "postgres",                                     #username
    "postgres"                                      #password
)

In application.conf:

jdbc {
    customer_postgres {
        url = "jdbc:postgresql://localhost:5432/customer"
        url = ${?POSTGRES_URL}
        user = "postgres"
        user = ${?POSTGRES_USERNAME}
        password = "postgres"
        password = ${?POSTGRES_PASSWORD}
        driver = "org.postgresql.Driver"
    }
}

Ensure that the user has write permission, so it is able to save the table to the target tables.

SQL Permission Statements
GRANT INSERT ON <schema>.<table> TO <user>;

Postgres

Can see example API or Config definition for Postgres connection above.

Permissions

Following permissions are required when generating plan and tasks:

SQL Permission Statements
GRANT SELECT ON information_schema.tables TO < user >;
GRANT SELECT ON information_schema.columns TO < user >;
GRANT SELECT ON information_schema.key_column_usage TO < user >;
GRANT SELECT ON information_schema.table_constraints TO < user >;
GRANT SELECT ON information_schema.constraint_column_usage TO < user >;

MySQL

mysql(
    "customer_mysql",                       #name
    "jdbc:mysql://localhost:3306/customer", #url
    "root",                                 #username
    "root"                                  #password
)
mysql(
    "customer_mysql",                       #name
    "jdbc:mysql://localhost:3306/customer", #url
    "root",                                 #username
    "root"                                  #password
)

In application.conf:

jdbc {
    customer_mysql {
        url = "jdbc:mysql://localhost:3306/customer"
        user = "root"
        password = "root"
        driver = "com.mysql.cj.jdbc.Driver"
    }
}

Permissions

Following permissions are required when generating plan and tasks:

SQL Permission Statements
GRANT SELECT ON information_schema.columns TO < user >;
GRANT SELECT ON information_schema.statistics TO < user >;
GRANT SELECT ON information_schema.key_column_usage TO < user >;

Cassandra

Follows same configuration as defined by the Spark Cassandra Connector as found here

cassandra(
    "customer_cassandra",   #name
    "localhost:9042",       #url
    "cassandra",            #username
    "cassandra",            #password
    Map.of()                #optional additional connection options
)
cassandra(
    "customer_cassandra",   #name
    "localhost:9042",       #url
    "cassandra",            #username
    "cassandra",            #password
    Map()                #optional additional connection options
)

In application.conf:

org.apache.spark.sql.cassandra {
    customer_cassandra {
        spark.cassandra.connection.host = "localhost"
        spark.cassandra.connection.host = ${?CASSANDRA_HOST}
        spark.cassandra.connection.port = "9042"
        spark.cassandra.connection.port = ${?CASSANDRA_PORT}
        spark.cassandra.auth.username = "cassandra"
        spark.cassandra.auth.username = ${?CASSANDRA_USERNAME}
        spark.cassandra.auth.password = "cassandra"
        spark.cassandra.auth.password = ${?CASSANDRA_PASSWORD}
    }
}

Permissions

Ensure that the user has write permission, so it is able to save the table to the target tables.

CQL Permission Statements
GRANT INSERT ON <schema>.<table> TO <user>;

Following permissions are required when enabling configuration.enableGeneratePlanAndTasks(true) as it will gather metadata information about tables and columns from the below tables.

CQL Permission Statements
GRANT SELECT ON system_schema.tables TO <user>;
GRANT SELECT ON system_schema.columns TO <user>;

Kafka

Define your Kafka bootstrap server to connect and send generated data to corresponding topics. Topic gets set at a step level.
Further details can be found here

kafka(
    "customer_kafka",   #name
    "localhost:9092"    #url
)
kafka(
    "customer_kafka",   #name
    "localhost:9092"    #url
)

In application.conf:

kafka {
    customer_kafka {
        kafka.bootstrap.servers = "localhost:9092"
        kafka.bootstrap.servers = ${?KAFKA_BOOTSTRAP_SERVERS}
    }
}

When defining your schema for pushing data to Kafka, it follows a specific top level schema.
An example can be found here . You can define the key, value, headers, partition or topic by following the linked schema.

JMS

Uses JNDI lookup to send messages to JMS queue. Ensure that the messaging system you are using has your queue/topic registered via JNDI otherwise a connection cannot be created.

solace(
    "customer_solace",                                      #name
    "smf://localhost:55554",                                #url
    "admin",                                                #username
    "admin",                                                #password
    "default",                                              #vpn name
    "/jms/cf/default",                                      #connection factory
    "com.solacesystems.jndi.SolJNDIInitialContextFactory"   #initial context factory
)
solace(
    "customer_solace",                                      #name
    "smf://localhost:55554",                                #url
    "admin",                                                #username
    "admin",                                                #password
    "default",                                              #vpn name
    "/jms/cf/default",                                      #connection factory
    "com.solacesystems.jndi.SolJNDIInitialContextFactory"   #initial context factory
)

In application.conf:

jms {
    customer_solace {
        initialContextFactory = "com.solacesystems.jndi.SolJNDIInitialContextFactory"
        connectionFactory = "/jms/cf/default"
        url = "smf://localhost:55555"
        url = ${?SOLACE_URL}
        user = "admin"
        user = ${?SOLACE_USER}
        password = "admin"
        password = ${?SOLACE_PASSWORD}
        vpnName = "default"
        vpnName = ${?SOLACE_VPN}
    }
}

HTTP

Define any username and/or password needed for the HTTP requests.
The url is defined in the tasks to allow for generated data to be populated in the url.

http(
    "customer_api", #name
    "admin",        #username
    "admin"         #password
)
http(
    "customer_api", #name
    "admin",        #username
    "admin"         #password
)

In application.conf:

http {
    customer_api {
        user = "admin"
        user = ${?HTTP_USER}
        password = "admin"
        password = ${?HTTP_PASSWORD}
    }
}