Skip to content

Foreign Keys/Relationships

Multiple data source foreign key example

Foreign keys can be defined to represent the relationships between datasets where values are required to match for particular fields.

Single field

Define a field in one data source to match against another field.
Below example shows a postgres data source with two tables, accounts and transactions that have a foreign key for account_id.

var postgresAcc = postgres("my_postgres", "jdbc:...")
  .table("public.accounts")
  .schema(
    field().name("account_id"),
    field().name("name"),
    ...
  );
var postgresTxn = postgres(postgresAcc)
  .table("public.transactions")
  .schema(
    field().name("account_id"),
    field().name("full_name"),
    ...
  );

plan().addForeignKeyRelationship(
  postgresAcc, "account_id",
  List.of(Map.entry(postgresTxn, "account_id"))
);
val postgresAcc = postgres("my_postgres", "jdbc:...")
  .table("public.accounts")
  .schema(
    field.name("account_id"),
    field.name("name"),
    ...
  )
val postgresTxn = postgres(postgresAcc)
  .table("public.transactions")
  .schema(
    field.name("account_id"),
    field.name("full_name"),
    ...
  )

plan.addForeignKeyRelationship(
  postgresAcc, "account_id",
  List(postgresTxn -> "account_id")
)
---
name: "postgres_data"
steps:
  - name: "accounts"
    type: "postgres"
    options:
      dbtable: "account.accounts"
    schema:
      fields:
        - name: "account_id"
        - name: "name"
  - name: "transactions"
    type: "postgres"
    options:
      dbtable: "account.transactions"
    schema:
      fields:
        - name: "account_id"
        - name: "full_name"
---
name: "customer_create_plan"
description: "Create customers in JDBC"
tasks:
  - name: "postgres_data"
    dataSourceName: "my_postgres"

sinkOptions:
  foreignKeys:
    - - "postgres.accounts.account_id"
      - - "postgres.transactions.account_id"
      - []

Multiple fields

You may have a scenario where multiple fields need to be aligned. From the same example, we want account_id and name from accounts to match with account_id and full_name to match in transactions respectively.

var postgresAcc = postgres("my_postgres", "jdbc:...")
  .table("public.accounts")
  .schema(
    field().name("account_id"),
    field().name("name"),
    ...
  );
var postgresTxn = postgres(postgresAcc)
  .table("public.transactions")
  .schema(
    field().name("account_id"),
    field().name("full_name"),
    ...
  );

plan().addForeignKeyRelationship(
  postgresAcc, List.of("account_id", "name"),
  List.of(Map.entry(postgresTxn, List.of("account_id", "full_name")))
);
val postgresAcc = postgres("my_postgres", "jdbc:...")
  .table("public.accounts")
  .schema(
    field.name("account_id"),
    field.name("name"),
    ...
  )
val postgresTxn = postgres(postgresAcc)
  .table("public.transactions")
  .schema(
    field.name("account_id"),
    field.name("full_name"),
    ...
  )

plan.addForeignKeyRelationship(
  postgresAcc, List("account_id", "name"),
  List(postgresTxn -> List("account_id", "full_name"))
)
---
name: "postgres_data"
steps:
  - name: "accounts"
    type: "postgres"
    options:
      dbtable: "account.accounts"
    schema:
      fields:
        - name: "account_id"
        - name: "name"
  - name: "transactions"
    type: "postgres"
    options:
      dbtable: "account.transactions"
    schema:
      fields:
        - name: "account_id"
        - name: "full_name"
---
name: "customer_create_plan"
description: "Create customers in JDBC"
tasks:
  - name: "postgres_data"
    dataSourceName: "my_postgres"

sinkOptions:
  foreignKeys:
    - - "my_postgres.accounts.account_id,name"
      - - "my_postgres.transactions.account_id,full_name"
      - []

Transformed field

Scenarios exist where there are relationships defined by certain transformations being applied to the source data.

For example, there may be accounts created with a field account_number that contains records like 123456. Then another data source contains account_id which is a concatenation of ACC with account_number to have values like ACC123456.

var postgresAcc = postgres("my_postgres", "jdbc:...")
  .table("public.accounts")
  .schema(
    field().name("account_number"),
    field().name("name"),
    ...
  );
var jsonTask = json("my_json", "/tmp/json")
  .schema(
    field().name("account_id").sql("CONCAT('ACC', account_number)"),
    field().name("account_number").omit(true),  #using this field for intermediate calculation, not included in final result with omit=true
    ...
  );

plan().addForeignKeyRelationship(
  postgresAcc, List.of("account_number"),
  List.of(Map.entry(jsonTask, List.of("account_number")))
);
val postgresAcc = postgres("my_postgres", "jdbc:...")
  .table("public.accounts")
  .schema(
    field.name("account_number"),
    field.name("name"),
    ...
  )
var jsonTask = json("my_json", "/tmp/json")
  .schema(
    field.name("account_id").sql("CONCAT('ACC', account_number)"),
    field.name("account_number").omit(true),  #using this field for intermediate calculation, not included in final result with omit=true
    ...
  )

plan.addForeignKeyRelationship(
  postgresAcc, List("account_number"),
  List(jsonTask -> List("account_number"))
)
---
#postgres task yaml
name: "postgres_data"
steps:
  - name: "accounts"
    type: "postgres"
    options:
      dbtable: "account.accounts"
    schema:
      fields:
        - name: "account_number"
        - name: "name"
---
#json task yaml
name: "json_data"
steps:
  - name: "transactions"
    type: "json"
    options:
      dbtable: "account.transactions"
    schema:
      fields:
        - name: "account_id"
          generator:
            options:
              sql: "CONCAT('ACC', account_number)"
        - name: "account_number"
          generator:
            options:
              omit: true

---
#plan yaml
name: "customer_create_plan"
description: "Create customers in JDBC"
tasks:
  - name: "postgres_data"
    dataSourceName: "my_postgres"
  - name: "json_data"
    dataSourceName: "my_json"

sinkOptions:
  foreignKeys:
    - - "my_postgres.accounts.account_number"
      - - "my_json.transactions.account_number"
      - []

Nested field

Your schema structure can have nested fields which can also be referenced as foreign keys. But to do so, you need to create a proxy field that gets omitted from the final saved data.

In the example below, the nested customer_details.name field inside the json task needs to match with name from postgres. A new field in the json called _txn_name is used as a temporary field to facilitate the foreign key definition.

var postgresAcc = postgres("my_postgres", "jdbc:...")
  .table("public.accounts")
  .schema(
    field().name("account_id"),
    field().name("name"),
    ...
  );
var jsonTask = json("my_json", "/tmp/json")
  .schema(
    field().name("account_id"),
    field().name("customer_details")
      .schema(
        field().name("name").sql("_txn_name"), #nested field will get value from '_txn_name'
        ...
      ),
    field().name("_txn_name").omit(true)       #value will not be included in output
  );

plan().addForeignKeyRelationship(
  postgresAcc, List.of("account_id", "name"),
  List.of(Map.entry(jsonTask, List.of("account_id", "_txn_name")))
);
val postgresAcc = postgres("my_postgres", "jdbc:...")
  .table("public.accounts")
  .schema(
    field.name("account_id"),
    field.name("name"),
    ...
  )
var jsonTask = json("my_json", "/tmp/json")
  .schema(
    field.name("account_id"),
    field.name("customer_details")
      .schema(
        field.name("name").sql("_txn_name"), #nested field will get value from '_txn_name'
        ...
      ), 
    field.name("_txn_name").omit(true)       #value will not be included in output
  )

plan.addForeignKeyRelationship(
  postgresAcc, List("account_id", "name"),
  List(jsonTask -> List("account_id", "_txn_name"))
)
---
#postgres task yaml
name: "postgres_data"
steps:
  - name: "accounts"
    type: "postgres"
    options:
      dbtable: "account.accounts"
    schema:
      fields:
        - name: "account_id"
        - name: "name"
---
#json task yaml
name: "json_data"
steps:
  - name: "transactions"
    type: "json"
    options:
      dbtable: "account.transactions"
    schema:
      fields:
        - name: "account_id"
        - name: "_txn_name"
          generator:
            options:
              omit: true
        - name: "cusotmer_details"
          schema:
            fields:
              name: "name"
              generator:
                type: "sql"
                options:
                  sql: "_txn_name"

---
#plan yaml
name: "customer_create_plan"
description: "Create customers in JDBC"
tasks:
  - name: "postgres_data"
    dataSourceName: "my_postgres"
  - name: "json_data"
    dataSourceName: "my_json"

sinkOptions:
  foreignKeys:
    - - "my_postgres.accounts.account_id,name"
      - - "my_json.transactions.account_id,_txn_name"
      - []