RabbitMQ is a message broker that allows producers (those who send a data message) and consumers (those who receive a data message) to establish asynchronous, real-time, and high-performance massive data flows. RabbitMQ supports AMQP (Advanced Message Queuing Protocol), an open standard application layer protocol.
The main reasons to employ RabbitMQ include the following:
- You can improve the performance of the applications using an asynchronous approach.
- It lets you decouple and reduce dependencies between services, microservices, and applications with the help of a data message mediator, meaning that there is no need for producers and consumers of exchanged data to know each other.
- It allows the long-running processing of sent data (with the results) to be delivered after utilizing a response queue.
- It helps you migrate from monolithic to microservices, where microservices exchange data via Rabbit in a decoupled and asynchronous way.
- It offers reliability and resilience by making it possible for messages to be stored and forwarded. A message can be delivered multiple times until it is processed.
- Message queueing is the key to scaling your application. As the workload increases, you will only have to add more workers to handle the queues faster.
- It works well with data streaming applications.
- It is beneficial for IoT applications.
- It is a must for Bots’ communication.
RabbitMQ basic concepts
We will utilize a temperature monitor as a case to detail RabbitMQ concepts:
There are a few existing types of possible exchanges (source: https://www.cloudamqp.com/blog/part1-rabbitmq-for-beginners-what-is-rabbitmq.html):
- Direct: The message is routed to the queues whose binding key is the exact match to the routing key of the message. For example, if the queue is bound to the exchange with the binding key pdfprocess, a message published to the exchange with a routing key pdfprocess will be routed to that queue.
- Fanout: A fanout exchange routes messages to all the queues bound to them.
- Topic: The topic exchange does a wildcard match between the routing key and the routing pattern specified in the binding.
- Headers: Headers make exchanges using the message header attributes for routing.
Install an example of IRIS and RabbitMQ running a temperature monitoring case
To learn more about IRIS and RabbitMQ, we will play with a sample from Open Exchange:
- Go to the https://openexchange.intersystems.com/package/rabbit-iris-sample.
- For Docker installation, clone/git pull the repo into any local directory:
git clone https://github.com/yurimarx/rabbit-iris-sample.git
- Open the terminal in this directory and run the following:
docker-compose build
- Run the IRIS container with your project:
docker-compose up -d
- For ZPM (IPM) installation, use the line below:
USER>zpm “rabbit-iris-sample”
Configure the RabbitMQ
1. Proceed to the RabbitMQ console (http://localhost:15672) and log in with the Username "guest" and Password "guest" to see the initial page:
Start the IRIS production and see the results
1. Proceed to the sample production (http://localhost:52795/csp/user/EnsPortal.ProductionConfig.zen?PRODUCTIO...) and click the button “Start”:
2. The production has started successfully:
3. At this point, move to the [SQL](http://localhost:52795/csp/sys/exp/%25CSP.UI.Portal.SQL.Home.zen?$NAMESPACE=USER&$NAMESPACE=USER) to query the temperature table and see the results:
InterSystems IRIS RabbitMQ (AMQP) services, operations, and message types to consume and produce messages
Overview
- The Business Service GetCurrentTemperature uses the Inbound Adapter dc.rabbit.TemperatureInboundAdapter to get the current temperature for a latitude/longitude localization. It consumes the external API (https://api.open-meteo.com/v1/forecast?latitude="_..Latitude_"&longitude="_..Longitude_"¤t=temperature_2m&temperature_unit=fahrenheit&forecast_days=1) and sends the result to SendTemperatureOperation.
- The SendTemperatureOperation receives the current temperature and utilizes the EnsLib.RabbitMQ.OutboundAdapter to publish it on the RabbitMQ topic "Temperature".
- The RabbitTemperatureClientService consumes the RabbitMQ "Temperature" topic, gets the temperature data, and sends it to RabbitTemperatureOperation.
- The RabbitTemperatureOperation receives the temperature data and saves it in the IRIS database in the table dc.rabbit.Temperature.
The temperature Inbound Adapter
Inbound adapters are employed by Business Services to connect to external data sources, APIs, File systems, Web services, etc. In our case, the Inbound Adapter will connect to an external API to get the temperature for latitude/longitude locations. Check out the source code comments below:
/// Extends the Ens.InboundAdapter to create a new custom adapter
Class dc.rabbit.TemperatureInboundAdapter Extends Ens.InboundAdapter
{
Property Latitude As %String;
Property Longitude As %String;
Property SSLConfig As %String;
/// To access HTTPS endpoints
Parameter SERVER = "api.open-meteo.com";
/// To the user config SSL config and the Latitude and Longitude parameters
Parameter SETTINGS = "SSLConfig:Basic:sslConfigSelector,Latitude:Basic,Longitude:Basic";
/// Method utilized to validate the configurations before starting the service using this adapter
Method OnInit() As %Status
{
If (..SSLConfig = "" || ..Latitude = "" || ..Longitude = "") {
return $$$ERROR(5001, "SSLConfig required")
}
Quit $$$OK
}
/// Method with the business implementation
Method OnTask() As %Status
{
If ((..SSLConfig = "" || ..Latitude = "" || ..Longitude = "")) {
Return $$$OK
}
Set tSC = 1
// HTTP Request
#dim httprequest as %Net.HttpRequest
#dim httpResponse as %Net.HttpResponse
Try {
Set httprequest = ##class(%Net.HttpRequest).%New()
Do httprequest.ServerSet(..#SERVER)
Do httprequest.SSLConfigurationSet(..SSLConfig)
/// URL to be requested
Set requestString = "/v1/forecast?latitude="_..Latitude_"&longitude="_..Longitude_"¤t=temperature_2m&temperature_unit=fahrenheit&forecast_days=1"
/// Requests the data from the temperature API
Do httprequest.Get(requestString)
Set httpResponse = httprequest.HttpResponse
/// If not OK, returns error
If (httpResponse.StatusCode '=200) {
$$$ThrowStatus($$$ERROR(5001, "HTTP StatusCode = "_httpResponse.StatusCode))
}
/// Gets the response and creates a dc.rabbit.TemperatureMessage to send to the Business Operation configure to receive the message
Set apiResult = {}.%FromJSON(httpResponse.Data)
Set temperature = ##class(dc.rabbit.TemperatureMessage).%New()
Set temperature.Latitude = ..Latitude
Set temperature.Longitude = ..Longitude
Set temperature.Temperature = apiResult.current."temperature_2m"
Set temperature.TemperatureDate = apiResult.current.time
$$$ThrowOnError(..BusinessHost.ProcessInput(temperature))
} Catch ex {
Do ex.Log()
Set tSC = ex.AsStatus()
}
/// Gets the current temperature again after the time configured in the service using this adapter (pulling time)
Set ..BusinessHost.%WaitForNextCallInterval=1
Quit tSC
}
}
The service dc.rabbit.TemperatureService
This service (dc.rabbit.TemperatureInboundAdapter) was designed to get the current temperature and send it to the SendTemperatureOperation (pay attention to the following comments):
/// Service to get the current temperature and send it to the configured operation
Class dc.rabbit.TemperatureService Extends Ens.BusinessService
{
Property Adapter As dc.rabbit.TemperatureInboundAdapter;
/// Uses the TemperatureInboundAdapter
Parameter ADAPTER = "dc.rabbit.TemperatureInboundAdapter";
/// Receives the temperature data from the inbound adapter and sends it to the operation
Method OnProcessInput(pInput As dc.rabbit.TemperatureMessage, pOutput As %RegisteredObject) As %Status
{
/// Creates a RabbitMQ message to send the temperature data as the content inside the RabbitMQ message
#dim rabbitMesssage As EnsLib.RabbitMQ.Message
Set rabbitMesssage = ##class(EnsLib.RabbitMQ.Message).%New()
Do pInput.%JSONExportToString(.content)
/// Sets the body of the message with the temperature data
Do rabbitMesssage.SetEncodedContent(content)
Set rabbitMesssage.appId = "IRIS"
Set rabbitMesssage.exchange = "temperature" /// Sets the RabbitMQ exchange that will receive the message
Set rabbitMesssage.routingKey = "temperature.current" /// Sets the right routing key to the exchange route of the message to the "Temperature" topic
Return ..SendRequestAsync("SendTemperatureOperation", rabbitMesssage) /// Sends the Rabbit message to the operation and passes the message to the Rabbit
}
}
The Business Operation SendTemperatureOperation
It is a business operation from IRIS(EnsLib.RabbitMQ.Operation) utilized to publish a message on the RabbitMQ topic. It receives an EnsLib.RabbitMQ.Message and sends it to the RabbitMQ server configured on RabbitMQ settings:
The message destination is defined in the source code that created the message (inside the GetCurrentTemperature service):
The properties exchange and routing key are needed to route the message to the right topic.
The Business service RabbitTemperatureClientService
This business service subscribes to a RabbitMQ topic, configured on RabbitMQ Settings, to receive messages and dispatch them for business operations or business processes configured on Target Config Names:
The BusinessOperation dc.rabbit.RabbitTemperatureOperation
This Business Operation receives a message from RabbitTemperatureClientService and saves it in the IRIS Database:
Class dc.rabbit.RabbitTemperatureOperation Extends Ens.BusinessOperation
{
Property Adapter As Ens.OutboundAdapter;
Parameter ADAPTER = "Ens.OutboundAdapter";
Method ProcessRabbitTemperature(pInput As EnsLib.RabbitMQ.Message, Output pOutput As %RegisteredObject) As %Status
{
Set content = {}.%FromJSON(pInput.encodedContent)
Set temperature = ##class(Temperature).%New()
Set temperature.Latitude = content.Latitude
Set temperature.Longitude = content.Longitude
Set temperature.Temperature = content.Temperature
Set temperature.TemperatureDate = content.TemperatureDate
Set tSC = temperature.%Save()
Return tSC
}
XData MessageMap
{
<MapItems>
<MapItem MessageType="EnsLib.RabbitMQ.Message">
<Method>ProcessRabbitTemperature</Method>
</MapItem>
</MapItems>
}
}
The EnsLib.RabbitMQ.Message class
This class from IRIS is used to transport and configure which RabbitMQ exchange will be chosen to receive or publish messages:
/// Receives the temperature data from the inbound adapter and sends it to the operation
Method OnProcessInput(pInput As dc.rabbit.TemperatureMessage, pOutput As %RegisteredObject) As %Status
{
/// Creates a RabbitMQ message to send the temperature data as the content inside the RabbitMQ message
#dim rabbitMesssage As EnsLib.RabbitMQ.Message
Set rabbitMesssage = ##class(EnsLib.RabbitMQ.Message).%New()
Do pInput.%JSONExportToString(.content)
/// Sets the body of the message with the temperature data
Do rabbitMesssage.SetEncodedContent(content)
Set rabbitMesssage.appId = "IRIS"
Set rabbitMesssage.exchange = "temperature" /// Sets the RabbitMQ exchange that will receive the message
Set rabbitMesssage.routingKey = "temperature.current" /// Sets the right routing key to the exchange route of the message to the "Temperature" topic
Return ..SendRequestAsync("SendTemperatureOperation", rabbitMesssage) /// Sends the Rabbit message to the operation and passes the message to the Rabbit
}
More information about this class can be found below:
https://docs.intersystems.com/iris20243/csp/docbook/Doc.View.cls?KEY=EME...
More information
Additional details about operating RabbitMQ adapters and utility classes may be found on the following resources:
- Using RabbitMQ on productions: https://docs.intersystems.com/iris20243/csp/docbook/DocBook.UI.Page.cls?...
- Using RabbitMQ on any part of your source code: https://docs.intersystems.com/iris20243/csp/docbook/DocBook.UI.Page.cls?...
- My sample: https://openexchange.intersystems.com/package/rabbit-iris-sample