Skip to content

Relationships/Foreign Keys

Multiple data source foreign key example

Foreign keys can be defined to represent the relationships between datasets where values are required to match for particular fields.

Single field

Define a field in one data source to match against another field.
Below example shows a postgres data source with two tables, accounts and transactions that have a foreign key for account_id.

var postgresAcc = postgres("my_postgres", "jdbc:...")
  .table("public.accounts")
  .schema(
    field().name("account_id"),
    field().name("name"),
    ...
  );
var postgresTxn = postgres(postgresAcc)
  .table("public.transactions")
  .schema(
    field().name("account_id"),
    field().name("full_name"),
    ...
  );

plan().addForeignKeyRelationship(
  postgresAcc, "account_id",
  List.of(Map.entry(postgresTxn, "account_id"))
);
val postgresAcc = postgres("my_postgres", "jdbc:...")
  .table("public.accounts")
  .schema(
    field.name("account_id"),
    field.name("name"),
    ...
  )
val postgresTxn = postgres(postgresAcc)
  .table("public.transactions")
  .schema(
    field.name("account_id"),
    field.name("full_name"),
    ...
  )

plan.addForeignKeyRelationship(
  postgresAcc, "account_id",
  List(postgresTxn -> "account_id")
)
---
name: "postgres_data"
steps:
  - name: "accounts"
    type: "postgres"
    options:
      dbtable: "account.accounts"
    schema:
      fields:
        - name: "account_id"
        - name: "name"
  - name: "transactions"
    type: "postgres"
    options:
      dbtable: "account.transactions"
    schema:
      fields:
        - name: "account_id"
        - name: "full_name"
---
name: "customer_create_plan"
description: "Create customers in JDBC"
tasks:
  - name: "postgres_data"
    dataSourceName: "my_postgres"

sinkOptions:
  foreignKeys:
    - - "postgres.accounts.account_id"
      - - "postgres.transactions.account_id"
      - []

Multiple fields

You may have a scenario where multiple fields need to be aligned. From the same example, we want account_id and name from accounts to match with account_id and full_name to match in transactions respectively.

var postgresAcc = postgres("my_postgres", "jdbc:...")
  .table("public.accounts")
  .schema(
    field().name("account_id"),
    field().name("name"),
    ...
  );
var postgresTxn = postgres(postgresAcc)
  .table("public.transactions")
  .schema(
    field().name("account_id"),
    field().name("full_name"),
    ...
  );

plan().addForeignKeyRelationship(
  postgresAcc, List.of("account_id", "name"),
  List.of(Map.entry(postgresTxn, List.of("account_id", "full_name")))
);
val postgresAcc = postgres("my_postgres", "jdbc:...")
  .table("public.accounts")
  .schema(
    field.name("account_id"),
    field.name("name"),
    ...
  )
val postgresTxn = postgres(postgresAcc)
  .table("public.transactions")
  .schema(
    field.name("account_id"),
    field.name("full_name"),
    ...
  )

plan.addForeignKeyRelationship(
  postgresAcc, List("account_id", "name"),
  List(postgresTxn -> List("account_id", "full_name"))
)
---
name: "postgres_data"
steps:
  - name: "accounts"
    type: "postgres"
    options:
      dbtable: "account.accounts"
    schema:
      fields:
        - name: "account_id"
        - name: "name"
  - name: "transactions"
    type: "postgres"
    options:
      dbtable: "account.transactions"
    schema:
      fields:
        - name: "account_id"
        - name: "full_name"
---
name: "customer_create_plan"
description: "Create customers in JDBC"
tasks:
  - name: "postgres_data"
    dataSourceName: "my_postgres"

sinkOptions:
  foreignKeys:
    - - "my_postgres.accounts.account_id,name"
      - - "my_postgres.transactions.account_id,full_name"
      - []

Transformed field

Scenarios exist where there are relationships defined by certain transformations being applied to the source data.

For example, there may be accounts created with a field account_number that contains records like 123456. Then another data source contains account_id which is a concatenation of ACC with account_number to have values like ACC123456.

var postgresAcc = postgres("my_postgres", "jdbc:...")
  .table("public.accounts")
  .schema(
    field().name("account_number"),
    field().name("name"),
    ...
  );
var jsonTask = json("my_json", "/tmp/json")
  .schema(
    field().name("account_id").sql("CONCAT('ACC', account_number)"),
    field().name("account_number").omit(true),  #using this field for intermediate calculation, not included in final result with omit=true
    ...
  );

plan().addForeignKeyRelationship(
  postgresAcc, List.of("account_number"),
  List.of(Map.entry(jsonTask, List.of("account_number")))
);
val postgresAcc = postgres("my_postgres", "jdbc:...")
  .table("public.accounts")
  .schema(
    field.name("account_number"),
    field.name("name"),
    ...
  )
var jsonTask = json("my_json", "/tmp/json")
  .schema(
    field.name("account_id").sql("CONCAT('ACC', account_number)"),
    field.name("account_number").omit(true),  #using this field for intermediate calculation, not included in final result with omit=true
    ...
  )

plan.addForeignKeyRelationship(
  postgresAcc, List("account_number"),
  List(jsonTask -> List("account_number"))
)
---
#postgres task yaml
name: "postgres_data"
steps:
  - name: "accounts"
    type: "postgres"
    options:
      dbtable: "account.accounts"
    schema:
      fields:
        - name: "account_number"
        - name: "name"
---
#json task yaml
name: "json_data"
steps:
  - name: "transactions"
    type: "json"
    options:
      dbtable: "account.transactions"
    schema:
      fields:
        - name: "account_id"
          generator:
            options:
              sql: "CONCAT('ACC', account_number)"
        - name: "account_number"
          generator:
            options:
              omit: true

---
#plan yaml
name: "customer_create_plan"
description: "Create customers in JDBC"
tasks:
  - name: "postgres_data"
    dataSourceName: "my_postgres"
  - name: "json_data"
    dataSourceName: "my_json"

sinkOptions:
  foreignKeys:
    - - "my_postgres.accounts.account_number"
      - - "my_json.transactions.account_number"
      - []

Nested field

Your schema structure can have nested fields which can also be referenced as foreign keys. But to do so, you need to create a proxy field that gets omitted from the final saved data.

In the example below, the nested customer_details.name field inside the json task needs to match with name from postgres. A new field in the json called _txn_name is used as a temporary field to facilitate the foreign key definition.

var postgresAcc = postgres("my_postgres", "jdbc:...")
  .table("public.accounts")
  .schema(
    field().name("account_id"),
    field().name("name"),
    ...
  );
var jsonTask = json("my_json", "/tmp/json")
  .schema(
    field().name("account_id"),
    field().name("customer_details")
      .schema(
        field().name("name").sql("_txn_name"), #nested field will get value from '_txn_name'
        ...
      ),
    field().name("_txn_name").omit(true)       #value will not be included in output
  );

plan().addForeignKeyRelationship(
  postgresAcc, List.of("account_id", "name"),
  List.of(Map.entry(jsonTask, List.of("account_id", "_txn_name")))
);
val postgresAcc = postgres("my_postgres", "jdbc:...")
  .table("public.accounts")
  .schema(
    field.name("account_id"),
    field.name("name"),
    ...
  )
var jsonTask = json("my_json", "/tmp/json")
  .schema(
    field.name("account_id"),
    field.name("customer_details")
      .schema(
        field.name("name").sql("_txn_name"), #nested field will get value from '_txn_name'
        ...
      ), 
    field.name("_txn_name").omit(true)       #value will not be included in output
  )

plan.addForeignKeyRelationship(
  postgresAcc, List("account_id", "name"),
  List(jsonTask -> List("account_id", "_txn_name"))
)
---
#postgres task yaml
name: "postgres_data"
steps:
  - name: "accounts"
    type: "postgres"
    options:
      dbtable: "account.accounts"
    schema:
      fields:
        - name: "account_id"
        - name: "name"
---
#json task yaml
name: "json_data"
steps:
  - name: "transactions"
    type: "json"
    options:
      dbtable: "account.transactions"
    schema:
      fields:
        - name: "account_id"
        - name: "_txn_name"
          generator:
            options:
              omit: true
        - name: "cusotmer_details"
          schema:
            fields:
              name: "name"
              generator:
                type: "sql"
                options:
                  sql: "_txn_name"

---
#plan yaml
name: "customer_create_plan"
description: "Create customers in JDBC"
tasks:
  - name: "postgres_data"
    dataSourceName: "my_postgres"
  - name: "json_data"
    dataSourceName: "my_json"

sinkOptions:
  foreignKeys:
    - - "my_postgres.accounts.account_id,name"
      - - "my_json.transactions.account_id,_txn_name"
      - []