RabbitMQ
Creating a data generator for RabbitMQ. You will build a Docker image that will be able to populate data in RabbitMQ for the queues/topics you configure.
Requirements
- 10 minutes
- Git
- Gradle
- Docker
Get Started
First, we will clone the data-caterer-example repo which will already have the base project setup required.
If you already have a RabbitMQ instance running, you can skip to this step.
RabbitMQ Setup
Next, let's make sure you have an instance of RabbitMQ up and running in your local environment. This will make it easy for us to iterate and check our changes.
Open up localhost:15672/#/queues and login with guest:guest
. Create a new queue
with name accounts
.
Plan Setup
Create a file depending on which interface you want to use.
- Java:
src/main/java/io/github/datacatering/plan/MyAdvancedRabbitMQJavaPlan.java
- Scala:
src/main/scala/io/github/datacatering/plan/MyAdvancedRabbitMQPlan.scala
- YAML:
docker/data/custom/plan/my-rabbitmq.yaml
In docker/data/custom/plan/my-rabbitmq.yaml
:
- Click on
Connection
towards the top of the screen - For connection name, set to
my_rabbitmq
- Click on
Select data source type..
and selectRabbitMQ
- Set
URL
asampq://host.docker.internal:5672
- Optionally, we could set the JNDI destination (queue or topic) but we would have to create a new connection for each queue or topic
- Click on
Create
- You should see your connection
my_rabbitmq
show underExisting connections
- Click on
Home
towards the top of the screen - Set plan name to
my_rabbitmq_plan
- Set task name to
rabbitmq_task
- Click on
Select data source..
and selectmy_rabbitmq
This class defines where we need to define all of our configurations for generating data. There are helper variables and methods defined to make it simple and easy to use.
Connection Configuration
Within our class, we can start by defining the connection properties to connect to RabbitMQ.
var accountTask = rabbitmq(
"my_rabbitmq", //name
"ampq://host.docker.internal:5672", //url
Map.of() //optional additional connection options
);
Additional connection options can be found here.
val accountTask = rabbitmq(
"my_rabbitmq", //name
"ampq://host.docker.internal:5672", //url
Map() //optional additional connection options
)
Additional connection options can be found here.
In docker/data/custom/application.conf
:
jms {
rabbitmq {
initialContextFactory = "com.rabbitmqsystems.jndi.SolJNDIInitialContextFactory"
initialContextFactory = ${?SOLACE_INITIAL_CONTEXT_FACTORY}
connectionFactory = "/jms/cf/default"
connectionFactory = ${?SOLACE_CONNECTION_FACTORY}
url = "smf://rabbitmqserver:55555"
url = ${?SOLACE_URL}
user = "admin"
user = ${?SOLACE_USER}
password = "admin"
password = ${?SOLACE_PASSWORD}
vpnName = "default"
vpnName = ${?SOLACE_VPN}
}
}
- We have already created the connection details in this step
Schema
Let's create a task for inserting data into the rest_test_queue
or rest_test_topic
that is already created for us
from this step.
Trimming the connection details to work with the docker-compose RabbitMQ, we have a base RabbitMQ connection to define
the JNDI destination we will publish to. Let's define each field along with their corresponding data type. You will
notice
that the text
fields do not have a data type defined. This is because the default data type is StringType
.
{
var rabbitmqTask = rabbitmq("my_rabbitmq", "ampq://host.docker.internal:5672")
.destination("accounts")
.fields(
//field().name("partition").type(IntegerType.instance()), can define JMS priority here
field().messageHeaders( //set message properties via headers field
field().messageHeader("account-id", "body.account_id"),
field().messageHeader("updated", "body.details.updated_by-time")
)
).fields(
field().messageBody(
field().name("account_id").regex("ACC[0-9]{8}"),
field().name("year").type(IntegerType.instance()).min(2021).max(2023),
field().name("amount").type(DoubleType.instance()),
field().name("details")
.fields(
field().name("name").expression("#{Name.name}"),
field().name("first_txn_date").type(DateType.instance()).sql("ELEMENT_AT(SORT_ARRAY(body.transactions.txn_date), 1)"),
field().name("updated_by")
.fields(
field().name("user"),
field().name("time").type(TimestampType.instance())
)
),
field().name("transactions").type(ArrayType.instance())
.fields(
field().name("txn_date").type(DateType.instance()).min(Date.valueOf("2021-01-01")).max("2021-12-31"),
field().name("amount").type(DoubleType.instance())
)
)
)
.count(count().records(10));
}
val rabbitmqTask = rabbitmq("my_rabbitmq", "ampq://host.docker.internal:5672")
.destination("accounts")
.fields(
//field.name("partition").type(IntegerType), can define JMS priority here
field.messageHeaders( //set message properties via headers field
field.messageHeader("account-id", "body.account_id"),
field.messageHeader("updated", "body.details.updated_by.time"),
)
)
.fields(
field.messageBody(
field.name("account_id").regex("ACC[0-9]{8}"),
field.name("year").`type`(IntegerType).min(2021).max(2023),
field.name("account_status").oneOf("open", "closed", "suspended", "pending"),
field.name("amount").`type`(DoubleType),
field.name("details").`type`(StructType)
.fields(
field.name("name").expression("#{Name.name}"),
field.name("first_txn_date").`type`(DateType).sql("ELEMENT_AT(SORT_ARRAY(body.transactions.txn_date), 1)"),
field.name("updated_by").`type`(StructType)
.fields(
field.name("user"),
field.name("time").`type`(TimestampType),
),
),
field.name("transactions").`type`(ArrayType)
.fields(
field.name("txn_date").`type`(DateType).min(Date.valueOf("2021-01-01")).max("2021-12-31"),
field.name("amount").`type`(DoubleType),
)
)
)
.count(count.records(10))
In docker/data/custom/task/rabbitmq/rabbitmq-task.yaml
:
name: "rabbitmq_task"
steps:
- name: "rabbitmq_account"
options:
destinationName: "accounts"
fields:
- name: "messageBody"
fields:
- name: "account_id"
- name: "year"
type: "int"
options:
min: "2021"
max: "2022"
- name: "amount"
type: "double"
options:
min: "10.0"
max: "100.0"
- name: "details"
fields:
- name: "name"
- name: "first_txn_date"
type: "date"
options:
sql: "ELEMENT_AT(SORT_ARRAY(body.transactions.txn_date), 1)"
- name: "updated_by"
fields:
- name: "user"
- name: "time"
type: "timestamp"
- name: "transactions"
type: "array"
fields:
- name: "txn_date"
type: "date"
- name: "amount"
type: "double"
- name: "messageHeaders"
fields:
- name: "account-id"
options:
sql: "body.account_id"
- name: "updated"
options:
sql: "body.details.update_by.time"
- Click on
Generation
and tick theManual
checkbox - Click on
+ Field
- Add name as
key
- Click on
Select data type
and selectstring
- Click
+
next to data type and selectSql
. Then enterbody.account_id
- Click on
+ Field
and add name asmessageBody
- Click on
Select data type
and selectstruct
- Click on
+ Field
undermessageBody
and add name asaccount_id
- Add additional fields under
messageBody
with your own metadata - Click on
+ Field
and add name asmessageHeaders
- Click on
Select data type
and selectstruct
- Click on
+ Field
undermessageHeaders
and add name asaccount_id
- Add additional fields under
messageHeaders
with your own metadata
- Add name as
Fields
The schema defined for RabbitMQ has a format that needs to be followed as noted above. Specifically, the required fields
are:
- messageBody
Whilst, the other fields are optional:
partition
- refers to JMS priority of the messagemessageHeaders
- refers to JMS message properties
Message Headers
If your messages contain headers, you can follow the details below on generating header values. These can be based off
values contained within you message body or could be static values, just like any other generated field. The main
restriction imposed here is that the key
of the message header is static and the value
has to be a valid SQL
expression.
In docker/data/custom/task/rabbitmq/rabbitmq-task.yaml
:
- Click on
+ Field
and add name asmessageHeaders
- Click on
Select data type
and selectstruct
- Click on
+ Field
undermessageHeaders
and add name asaccount_id
- Add additional fields under
messageHeaders
with your own metadata
transactions
transactions
is an array that contains an inner structure of txn_date
and amount
. The size of the array generated
can be controlled via arrayMinLength
and arrayMaxLength
.
In docker/data/custom/task/rabbitmq/rabbitmq-task.yaml
:
Warning
Inner field definition for array type is currently not supported from the UI. Will be added in the near future!
details
details
is another example of a nested schema structure where it also has a nested structure itself in updated_by
.
One thing to note here is the first_txn_date
field has a reference to the content.transactions
array where it will
sort the array by txn_date
and get the first element.
field().name("details")
.fields(
field().name("name").expression("#{Name.name}"),
field().name("first_txn_date").type(DateType.instance()).sql("ELEMENT_AT(SORT_ARRAY(content.transactions.txn_date), 1)"),
field().name("updated_by")
.fields(
field().name("user"),
field().name("time").type(TimestampType.instance())
)
)
In docker/data/custom/task/rabbitmq/rabbitmq-task.yaml
:
name: "rabbitmq_task"
steps:
- name: "rabbitmq_account"
options:
destinationName: "accounts"
fields:
- name: "messageBody"
fields:
- name: "details"
fields:
- name: "name"
- name: "first_txn_date"
type: "date"
options:
sql: "ELEMENT_AT(SORT_ARRAY(body.transactions.txn_date), 1)"
- name: "updated_by"
fields:
- name: "user"
- name: "time"
type: "timestamp"
- Click on
+ Field
and add name asmessageBody
- Click on
Select data type
and selectstruct
- Click on
+ Field
undermessageBody
and add name asdetails
- Add additional fields under
details
with your own metadata
Additional Configurations
At the end of data generation, a report gets generated that summarises the actions it performed. We can control the output folder of that report via configurations.
In docker/data/custom/application.conf
:
- Click on
Advanced Configuration
towards the bottom of the screen - Click on
Folder
and enter/tmp/data-caterer/report
forGenerated Reports Folder Path
Execute
To tell Data Caterer that we want to run with the configurations along with the rabbitmqTask
, we have to call execute
.
Run
Now we can run via the script ./run.sh
that is in the top level directory of the data-caterer-example
to run the
class we just created.
- Click the button
Execute
at the top - Progress updates will show in the bottom right corner
- Click on
History
at the top - Check for your plan name and see the result summary
- Click on
Report
on the right side to see more details of what was executed
Your output should look like this.
You can check the message payload by clicking on Get Message(s)
. Once you copy the payload, you can run a command
like: echo "<payload>" | base64 -D
to see the actual message.
Also check the HTML report, found at docker/sample/report/index.html
, that gets generated to get an overview of what
was executed. Or view the sample report found here.