In this blog post, I’ll guide you through the process of setting up Oracle AQ with the WSO2 MI server based on my own experiences.
Tested Products and versions
- WSO2 Product : WSO2 MI: 4.1.0 (U2 level 79)
- DB version: Oracle 12c
- OS : Ubuntu 22.04.2 LTS
- JDK: openjdk version “11.0.21”
- SQL developer: Version 23.1.0.097
Prerequisites:
- WSO2 MI Installation:
- Install and set up WSO2 MI on your server.
Configure Oracle 12c DB with docker
If you require you can refer to the below steps and commands to configure the Oracle 12c docker image.
To Search for Oracle-related images
sudo docker search oracle --filter=STARS=17
To pull this image
sudo docker pull truevoly/oracle-12c
To list all the locally available Docker images on your system
sudo docker images
To create a new directory (ora_data
) in the /var/local/
path
sudo mkdir -p /var/local/ora_data
To grant read, write, and execute (full) permissions to all users (owner, group, and others) for all files and directories under /var/local/ora_data/
sudo chmod -R 777 /var/local/ora_data/
To run a Docker container for Oracle Database 12c
sudo docker run \
-p 1521:1521 -p 5500:5500 \
-e ORACLE_SID=sys \
-e ORACLE_PWD=oracle \
-e ORACLE_MEM=4000 \
-v /opt/oracle/ora_data \
-d \
docker.io/truevoly/oracle-12c:latest
To open an interactive shell session (/bin/bash
) within the specified Docker container
sudo docker exec -it c5223891ab6e /bin/bash
Connect Oracle db to Queue
Create a connection and log in using the sys user
- Username: sys
- Password: oracle
- SID: xe
- Port: 1521
- Role: SYSDBA
Create a new user / schema using the sys
create user testuser identified by oracle;
grant connect to testuser;
grant all privileges to testuser;
Grant privileges to the user that we created
grant all on sys.dbms_aq to testuser;
grant all on dbms_aqadm to testuser;
grant all on dbms_aqin to testuser;
create a queue and queue table by executing the below in the schema we created previously
EXEC dbms_aqadm.create_queue_table('testqt', 'SYS.AQ$_JMS_TEXT_MESSAGE')
EXEC dbms_aqadm.create_queue('testq','testqt')
EXEC dbms_aqadm.start_queue('testq')
To enable the display of server output.
set serverout on
If needed you can simply open two SQL developer instances and directly push the messages to the queue and then receive the messages.
- For that, you can use the below to send a message to the queue directly.
DECLARE
enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
message_handle RAW (16);
msg SYS.AQ$_JMS_TEXT_MESSAGE;
BEGIN
msg := SYS.AQ$_JMS_TEXT_MESSAGE.construct;
msg.set_text('This is a test message!');
DBMS_AQ.ENQUEUE (
queue_name => 'testq',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => msg,
msgid => message_handle);
COMMIT;
END;
/
YOU CAN USE THE BELOW, TO LISTEN TO THE QUEUE AND DEQUEUE MESSAGES IN THE QUEUE TABLE
DECLARE
dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
message_handle RAW (16);
msg SYS.AQ$_JMS_TEXT_MESSAGE;
BEGIN
DBMS_AQ.dequeue (
queue_name => 'testq',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => msg,
msgid => message_handle);
DBMS_OUTPUT.PUT_LINE(msg.TEXT_VC);
COMMIT;
END;
/
Configure the JMS Transport Sender and Receiver in WSO2 MI
Add the below configurations into the deployment.toml file which resides in the <MI_HOME>/conf directory.
Configure the JMS Transport Sender
[[transport.jms.sender]]
name = "commonJmsSenderConnectionFactory"
parameter.db_url="jdbc:oracle:thin:@localhost:1521/xe" ### DB url with SID
parameter.initial_naming_factory = "oracle.jms.AQjmsInitialContextFactory"
parameter.connection_factory_name = "QueueConnectionFactory"
parameter.connection_factory_type = "queue"
parameter.naming_security_principal="testuser" ### user
parameter.naming_security_credential="oracle" ### password
[[transport.jms.sender]]
name = "commonJmsSenderConnectionFactory"
parameter.db_url="jdbc:oracle:thin:@localhost:1521/xe" ### DB url with SID
parameter.initial_naming_factory = "oracle.jms.AQjmsInitialContextFactory"
parameter.connection_factory_name = "TopicConnectionFactory"
parameter.connection_factory_type = "topic"
parameter.naming_security_principal="testuser" ### user
parameter.naming_security_credential="oracle" ### password
Configure the JMS Transport Listner
[[transport.jms.listener]]
name = "AqQueueConnectionFactory"
parameter.initial_naming_factory = "oracle.jms.AQjmsInitialContextFactory"
parameter.db_url="jdbc:oracle:thin:@localhost:1521/xe" ### DB url with SID
parameter.connection_factory_name = "QueueConnectionFactory"
parameter.connection_factory_type = "queue"
parameter.naming_security_principal="testuser" ### user
parameter.naming_security_credential="oracle" ### password
[[transport.jms.listener]]
name = "AqTopicConnectionFactory"
parameter.initial_naming_factory = "oracle.jms.AQjmsInitialContextFactory"
parameter.db_url="jdbc:oracle:thin:@localhost:1521/xe" ### DB url with SID
parameter.connection_factory_name = "TopicConnectionFactory"
parameter.connection_factory_type = "topic"
parameter.naming_security_principal="testuser" ### user
parameter.naming_security_credential="oracle" ### password
create a Proxy service to publish messages to a queue in Oracle AQ
<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
name="TestProxy"
transports="http https"
startOnLoad="true">
<description/>
<target>
<inSequence>
<log>
<property name="in" value="==== IN ====="/>
</log>
<property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>
<property name="OUT_ONLY" value="true"/>
<send>
<endpoint>
<address uri="jms:/Queues/testq?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&java.naming.factory.initial=oracle.jms.AQjmsInitialContextFactory&transport.jms.DestinationType=queue&java.naming.security.principal=testuser&java.naming.security.credentials=oracle&db_url=jdbc:oracle:thin:@localhost:1521/xe"/>
</endpoint>
</send>
</inSequence>
</target>
</proxy>
Create an Inbound endpoint to consume messages from the queue in Oracle AQ
<?xml version="1.0" encoding="UTF-8"?>
<inboundEndpoint xmlns="http://ws.apache.org/ns/synapse" name="DurableTopicInboundListener" sequence="request" onError="fault" protocol="jms" suspend="false">
<parameters>
<parameter name="interval">1000</parameter>
<parameter name="transport.jms.Destination">Queues/testq</parameter>
<parameter name="transport.jms.CacheLevel">1</parameter>
<parameter name="transport.jms.ConnectionFactoryJNDIName">QueueConnectionFactory</parameter>
<parameter name="sequential">true</parameter>
<parameter name="java.naming.factory.initial">oracle.jms.AQjmsInitialContextFactory</parameter>
<parameter name="java.naming.provider.url">jdbc:oracle:thin:@localhost:1521/testdb</parameter>
<parameter name="java.naming.security.principal">testuser</parameter>
<parameter name="java.naming.security.credentials">oracle</parameter>
<parameter name="db_url">jdbc:oracle:thin:@localhost:1521/xe</parameter>
<parameter name="transport.jms.SessionAcknowledgement">AUTO_ACKNOWLEDGE</parameter>
<parameter name="transport.jms.SessionTransacted">false</parameter>
<parameter name="transport.jms.ConnectionFactoryType">queue</parameter>
</parameters>
</inboundEndpoint>
Deploy the sequence as below
- Then it will engage when it sends the correct message.
Eg:
<?xml version="1.0" encoding="UTF-8"?>
<sequence name="request" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
<log level="full"/>
<drop/>
</sequence>
Fault sequence will consume when it sends the incorrect message
<sequence xmlns="http://ws.apache.org/ns/synapse" name="fault">
<!-- Log the message at the full log level with the ERROR_MESSAGE and the ERROR_CODE-->
<log level="full">
<property name="MESSAGE" value="Executing default 'fault' sequence"/>
<property name="ERROR_CODE" expression="get-property('ERROR_CODE')"/>
<property name="ERROR_MESSAGE" expression="get-property('ERROR_MESSAGE')"/>
</log>
<!-- Drops the messages by default if there is a fault -->
<drop/>
</sequence>
- Additionally, you need to add the below jar files to the wso2mi-4.1.0-home/lib directory.
- If you need you can use jars which I used for the testings from here.
- aqapi.jar
- jmscommon.jar
- jta.jar
- ojdbc7.jar
Once you have done all the required configurations you can test the scenario.
Eg:
curl --location 'http://localhost:8290/services/TestProxy' \
--header 'Content-Type: application/xml' \
--data '<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:wsa="http://www.w3.org/2005/08/addressing">
<soapenv:Body>
<m0:getQuote xmlns:m0="http://services.samples">
<m0:request>
<m0:symbol>test</m0:symbol>
</m0:request>
</m0:getQuote>
</soapenv:Body>
</soapenv:Envelope>'
Hope you find this blog useful! Now, it’s your turn to dive in and explore further.
Cheers !!