A Simple Oracle Advanced Queue (DBMS AQ) Example: Automatic Callback #JoelKallmanDay

Oracle Advanced Queuing (AQ) provides a powerful mechanism for automatic, event-driven data processing with built-in retry mechanisms and callback functions.
In this post, we'll explore how to implement a fully automated data synchronization system using DBMS_AQ callbacks that automatically process messages as they arrive, with intelligent retry handling for failed operations.
Complete Implementation on GitHub - Full working code example
What is Oracle Advanced Queuing?
Oracle Advanced Queuing is a message queuing system built into the Oracle Database that enables automatic, event-driven communication between applications. Key features include:
- Automatic message processing via callback functions
- Built-in retry mechanisms for failed operations
- Event-driven architectures with immediate processing
- Reliable message delivery with persistence and recovery
The Problem: System Integration
Modern enterprises often need to synchronize data between multiple systems:
- CRM systems to data warehouses
- ERP systems to analytics databases
- Web applications to backup systems
- Real-time data feeds between microservices
The Solution: AQ-Based Data Synchronization
Our implementation creates a fully automated queue-based system that handles synchronization tasks asynchronously with automatic retry capabilities and immediate callback processing.
Key Improvements Over Basic AQ Tutorial
This implementation extends the Oracle-Base AQ tutorial with:
- ✅ Automatic callback processing - Messages processed immediately upon arrival
- ✅ Built-in retry mechanisms - Failed operations automatically retry with configurable delays
- ✅ Comprehensive monitoring - Track retry counts, message states, and operation history
- ✅ Production-ready features - Error handling, audit trails, and status tracking
Architecture Overview
┌─────────────────┐ ┌──────────────┐ ┌──────────────┐ ┌─────────────────┐
│ Source │───▶│ AQ Queue │───▶│ Auto Callback│───▶│ Target │
│ System │ │ (with Retry) │ │ (Immediate) │ │ System │
└─────────────────┘ └──────────────┘ └──────────────┘ └─────────────────┘
│ │
▼ ▼
┌──────────────┐
│ Retry Logic │
│ (5 attempts) │
└──────────────┘
Implementation Details
1. Database Schema Setup
First, we create a dedicated schema for our AQ implementation:
-- Create user with necessary privileges
create user "AQ_DEMO" identified by "Password123*";
grant create session, create table, create type, create procedure to "AQ_DEMO";
grant execute on dbms_aqadm, dbms_aq to "AQ_DEMO";
grant unlimited tablespace to "AQ_DEMO";
2. Sync Operations Tracking
We maintain a history of all synchronization operations:
create table sync_operations (
id number generated by default as identity primary key,
source_system varchar2(50) not null,
target_system varchar2(50) not null,
sync_type varchar2(30) not null,
record_count number not null,
sync_status varchar2(20) default 'COMPLETED',
created_on timestamp with local time zone default localtimestamp
);
Key Features:
- Identity column for automatic ID generation
- Audit trail with creation timestamps
- Status tracking (PENDING, PROCESSING, COMPLETED, FAILED)
- Sync type classification (FULL, INCREMENTAL, DELTA, REALTIME)
3. Message Type Definition
We define a custom object type for our sync messages:
create or replace type sync_task_type as object (
source_system varchar2(50),
target_system varchar2(50),
sync_type varchar2(30),
record_count number,
priority number
);
4. Queue Configuration
The queue is configured with retry mechanisms:
-- Create queue table
dbms_aqadm.create_queue_table(
queue_table => 'AQ_DEMO.SYNC_QUEUE_TABLE',
queue_payload_type => 'AQ_DEMO.SYNC_TASK_TYPE',
multiple_consumers => false,
comment => 'Queue table for data synchronization tasks'
);
-- Create queue with retry configuration
dbms_aqadm.create_queue(
queue_name => 'AQ_DEMO.SYNC_QUEUE',
queue_table => 'AQ_DEMO.SYNC_QUEUE_TABLE',
max_retries => 5, -- Maximum number of retry attempts
retry_delay => 30, -- Delay in seconds between retries
retention_time => 3600, -- Retention time in seconds (1 hour)
comment => 'Queue for data synchronization tasks with retry configuration'
);
--
begin
-- Start the queue
dbms_aqadm.start_queue('AQ_DEMO.SYNC_QUEUE');
end;
Retry Configuration Benefits:
- Automatic retry for failed messages
- Configurable delays to avoid overwhelming systems
- Message retention for debugging and auditing
5. Automatic Message Processing
The callback procedure processes messages automatically:
create or replace procedure sync_callback(
context in raw
, reginfo in sys.aq$_reg_info
, descr in sys.aq$_descriptor
, payloadl in number
, payload in varchar2
) as
l_message sync_task_type;
l_dequeue_options dbms_aq.dequeue_options_t;
l_message_props dbms_aq.message_properties_t;
l_message_id raw(16);
l_sync_type varchar2(100);
begin
dbms_output.put_line('Sync callback triggered at ' || to_char(systimestamp, 'HH24:MI:SS'));
-- Configure dequeue options
-- WAIT: Controls how long to wait for messages
-- • NO_WAIT: Return immediately if no message available
-- • FOREVER: [default] Wait indefinitely for a message
-- • Number: Wait specified seconds (0-4294967295)
l_dequeue_options.wait := dbms_aq.no_wait;
-- NAVIGATION: Which message to retrieve from queue
-- • FIRST_MESSAGE: [default] Get first available message
-- • NEXT_MESSAGE: Get next message in sequence
-- • FIRST_MESSAGE_MULTI_GROUP: First message across consumer groups
-- • NEXT_MESSAGE_MULTI_GROUP: Next message across consumer groups
--l_dequeue_options.navigation := dbms_aq.first_message;
-- VISIBILITY: When dequeue operation becomes visible to other transactions
-- • ON_COMMIT: [default] Changes visible only after transaction commit
-- • IMMEDIATE: Changes visible immediately
l_dequeue_options.visibility := dbms_aq.on_commit;
-- DEQUEUE_MODE: What happens to the message after dequeue
-- • BROWSE: Read message but keep it in queue
-- • LOCKED: Lock message for exclusive access
-- • REMOVE: [default] Delete message after reading
-- • REMOVE_NODATA: Delete message but don't return payload
l_dequeue_options.dequeue_mode := dbms_aq.remove;
-- DELIVERY_MODE: How messages are stored and delivered
-- • PERSISTENT: [default] Messages stored in database tables (durable)
-- • BUFFERED: Messages kept in memory (faster)
-- • PERSISTENT_OR_BUFFERED: Use either mode as appropriate
--l_dequeue_options.delivery_mode := dbms_aq.persistent;
-- Additional dequeue options available:
-- l_dequeue_options.consumer_name := 'consumer_name'; -- Target specific consumer in multi-consumer queues
-- l_dequeue_options.msgid := raw_message_id; -- Dequeue specific message by its unique ID
-- l_dequeue_options.correlation := 'correlation_id'; -- Filter messages by correlation identifier
-- l_dequeue_options.deq_condition := 'priority > 5'; -- SQL WHERE condition to filter which messages to dequeue
-- l_dequeue_options.transformation := 'transform_name'; -- Apply transformation function to message payload before returning
-- Available descriptor properties for logging/debugging:
-- descr.msg_id - Message ID (RAW)
-- descr.queue_name - Queue name where message is located
-- descr.consumer_name - Consumer name (for multi-consumer queues)
-- Note: msg_priority and msg_state are not directly available on descriptor
logger.log('msg_id: ' || descr.msg_id);
/* logger.log('queue_name: ' || descr.queue_name);
logger.log('consumer_name: ' || descr.consumer_name);
logger.log('payload: ' || payload);
logger.log('payloadl: ' || payloadl);
*/
l_dequeue_options.msgid := descr.msg_id;
-- Dequeue the message
dbms_aq.dequeue(
queue_name => 'AQ_DEMO.SYNC_QUEUE'
, dequeue_options => l_dequeue_options
, message_properties => l_message_props
, payload => l_message
, msgid => l_message_id
);
l_sync_type := l_message.sync_type;
-- Process the sync task
insert into sync_operations (source_system, target_system, sync_type, record_count, sync_status)
values (l_message.source_system, l_message.target_system, l_message.sync_type, l_message.record_count, 'COMPLETED');
dbms_output.put_line('Sync operation completed: ' || l_message.source_system || ' -> ' || l_message.target_system);
--commit;
exception
when others then
--rollback;
dbms_output.put_line('Error in sync callback: ' || sqlerrm);
raise;
end;
6. Regiter the callBack
-- When our queue SYNC_QUEUE will receibe an element,
-- this PLSQL is going to triger
begin
dbms_aq.register(
sys.aq$_reg_info_list(
sys.aq$_reg_info(
'AQ_DEMO.SYNC_QUEUE'
, dbms_aq.namespace_aq
, 'PLSQL://AQ_DEMO.SYNC_CALLBACK'
, null
)
)
, 1
);
dbms_output.put_line('Sync notification registered');
end;
/
8. Start the queue
begin
-- Start the queue
dbms_aqadm.start_queue('AQ_DEMO.SYNC_QUEUE');
end;
/
8. Queue Management API
A simple procedure to queue sync tasks:
create or replace procedure queue_sync_task(
p_source_system in varchar2
, p_target_system in varchar2
, p_sync_type in varchar2
, p_record_count in number
, p_priority in number default 5
) as
l_enqueue_options dbms_aq.enqueue_options_t;
l_message_props dbms_aq.message_properties_t;
l_message sync_task_type;
l_msgid raw(16);
begin
dbms_output.put_line('Queueing sync task: ' || p_source_system || ' -> ' || p_target_system);
-- Create sync task message
l_message := sync_task_type(
source_system => p_source_system,
target_system => p_target_system,
sync_type => p_sync_type,
record_count => p_record_count,
priority => p_priority
);
-- Configure enqueue options
l_enqueue_options.visibility := dbms_aq.on_commit;
-- Enqueue the sync task
dbms_aq.enqueue(
queue_name => 'AQ_DEMO.SYNC_QUEUE'
, enqueue_options => l_enqueue_options
, message_properties => l_message_props
, payload => l_message
, msgid => l_msgid
);
commit;
dbms_output.put_line('Sync task queued successfully');
exception
when others then
rollback;
dbms_output.put_line('Error queueing sync task: ' || sqlerrm);
raise;
end;
Usage Examples
Queuing Sync Tasks
-- Queue different types of sync operations
begin
queue_sync_task('CRM_SYSTEM', 'DATA_WAREHOUSE', 'INCREMENTAL', 1250, 3);
queue_sync_task('ERP_SYSTEM', 'ANALYTICS_DB', 'FULL', 50000, 5);
queue_sync_task('WEB_APP', 'BACKUP_SYSTEM', 'DELTA', 340, 1);
end;
Monitoring Operations
-- Check completed sync operations
select source_system
, target_system
, sync_type
, record_count
, sync_status
, created_on
from sync_operations
order by created_on desc;
Queue Status Monitoring
-- Monitor queue status and retry counts
select treat(user_data as sync_task_type).source_system
, treat(user_data as sync_task_type).target_system
, enq_time
, deq_time
, retry_count
, msg_state
from aq$sync_queue_table
order by enq_time desc;
Benefits of This Approach
1. Reliability
- Automatic retry mechanisms for failed operations
- Message persistence ensures no data loss
- Transaction-safe operations
2. Scalability
- Asynchronous processing doesn't block source systems
- Multiple consumers can process messages in parallel, This is a different example, where the multiple_consumers = TRUE and you need to register consumers; this example is only a single consumer.
- Queue-based architecture handles load spikes
Real-World Applications
This pattern is particularly useful for:
- ETL Processes: Triggering data warehouse updates
- Microservices: Event-driven communication
- Data Replication: Keeping systems in sync
- Backup Operations: Asynchronous backup triggers
- Analytics: Real-time data feed processing
Conclusion
Oracle Advanced Queuing provides a robust foundation for building reliable data synchronization systems. The combination of automatic retry mechanisms, message persistence, and flexible configuration makes it an excellent choice for enterprise integration scenarios.
The implementation shown here demonstrates how to create a production-ready sync system that can handle various types of data synchronization tasks while providing comprehensive monitoring and error handling capabilities.
References and Further Reading
Official Oracle Documentation
- Oracle Database 21c: Advanced Queuing Types - Comprehensive AQ type definitions
- Oracle Database 21c: DBMS_AQ Package - Complete API reference
- Oracle Database 12c: AQ Operations Guide - Advanced queue operations
Tutorials and Examples
- Oracle-Base: Advanced Queuing in Oracle Database - Base tutorial that inspired this implementation
- Asynchronous Processing using AQ Callback - Callback implementation patterns
Source Code
- Complete Implementation on GitHub - Full working code example with retry mechanisms and callback processing
Troubleshooting
- Stack Overflow: Oracle AQ Dequeue Issues - Common dequeue problems and solutions
