Skip to main content

Command Palette

Search for a command to run...

A Simple Oracle Advanced Queue (DBMS AQ) Example: Automatic Callback #JoelKallmanDay

Updated
8 min read
A Simple Oracle Advanced Queue (DBMS AQ) Example: Automatic Callback #JoelKallmanDay

Oracle Advanced Queuing (AQ) provides a powerful mechanism for automatic, event-driven data processing with built-in retry mechanisms and callback functions.

In this post, we'll explore how to implement a fully automated data synchronization system using DBMS_AQ callbacks that automatically process messages as they arrive, with intelligent retry handling for failed operations.

Complete Implementation on GitHub - Full working code example

What is Oracle Advanced Queuing?

Oracle Advanced Queuing is a message queuing system built into the Oracle Database that enables automatic, event-driven communication between applications. Key features include:

  • Automatic message processing via callback functions
  • Built-in retry mechanisms for failed operations
  • Event-driven architectures with immediate processing
  • Reliable message delivery with persistence and recovery

The Problem: System Integration

Modern enterprises often need to synchronize data between multiple systems:

  • CRM systems to data warehouses
  • ERP systems to analytics databases
  • Web applications to backup systems
  • Real-time data feeds between microservices

The Solution: AQ-Based Data Synchronization

Our implementation creates a fully automated queue-based system that handles synchronization tasks asynchronously with automatic retry capabilities and immediate callback processing.

Key Improvements Over Basic AQ Tutorial

This implementation extends the Oracle-Base AQ tutorial with:

  • Automatic callback processing - Messages processed immediately upon arrival
  • Built-in retry mechanisms - Failed operations automatically retry with configurable delays
  • Comprehensive monitoring - Track retry counts, message states, and operation history
  • Production-ready features - Error handling, audit trails, and status tracking

Architecture Overview

┌─────────────────┐    ┌──────────────┐    ┌──────────────┐    ┌─────────────────┐
│   Source        │───▶│  AQ Queue    │───▶│ Auto Callback│───▶│   Target        │
│   System        │    │ (with Retry) │    │ (Immediate)  │    │   System        │
└─────────────────┘    └──────────────┘    └──────────────┘    └─────────────────┘
                              │                      │
                              ▼                      ▼
                       ┌──────────────┐
                       │ Retry Logic  │
                       │ (5 attempts) │
                       └──────────────┘

Implementation Details

1. Database Schema Setup

First, we create a dedicated schema for our AQ implementation:

-- Create user with necessary privileges
create user "AQ_DEMO" identified by "Password123*";
grant create session, create table, create type, create procedure to "AQ_DEMO";
grant execute on dbms_aqadm, dbms_aq to "AQ_DEMO";
grant unlimited tablespace to "AQ_DEMO";

2. Sync Operations Tracking

We maintain a history of all synchronization operations:

create table sync_operations (
    id                number generated by default as identity primary key,
    source_system     varchar2(50) not null,
    target_system     varchar2(50) not null,
    sync_type         varchar2(30) not null,
    record_count      number not null,
    sync_status       varchar2(20) default 'COMPLETED',
    created_on        timestamp with local time zone default localtimestamp
);

Key Features:

  • Identity column for automatic ID generation
  • Audit trail with creation timestamps
  • Status tracking (PENDING, PROCESSING, COMPLETED, FAILED)
  • Sync type classification (FULL, INCREMENTAL, DELTA, REALTIME)

3. Message Type Definition

We define a custom object type for our sync messages:

create or replace type sync_task_type as object (
    source_system    varchar2(50),
    target_system    varchar2(50),
    sync_type        varchar2(30),
    record_count     number,
    priority         number
);

4. Queue Configuration

The queue is configured with retry mechanisms:

  -- Create queue table
  dbms_aqadm.create_queue_table(
    queue_table        => 'AQ_DEMO.SYNC_QUEUE_TABLE',
    queue_payload_type => 'AQ_DEMO.SYNC_TASK_TYPE',
    multiple_consumers => false,
    comment            => 'Queue table for data synchronization tasks'
  );

  -- Create queue with retry configuration
  dbms_aqadm.create_queue(
    queue_name     => 'AQ_DEMO.SYNC_QUEUE',
    queue_table    => 'AQ_DEMO.SYNC_QUEUE_TABLE',
    max_retries    => 5,        -- Maximum number of retry attempts
    retry_delay    => 30,       -- Delay in seconds between retries
    retention_time => 3600,     -- Retention time in seconds (1 hour)
    comment        => 'Queue for data synchronization tasks with retry configuration'
  );

  --
  begin
    -- Start the queue
    dbms_aqadm.start_queue('AQ_DEMO.SYNC_QUEUE');
  end;

Retry Configuration Benefits:

  • Automatic retry for failed messages
  • Configurable delays to avoid overwhelming systems
  • Message retention for debugging and auditing

5. Automatic Message Processing

The callback procedure processes messages automatically:

create or replace procedure sync_callback(
    context   in raw
  , reginfo   in sys.aq$_reg_info
  , descr     in sys.aq$_descriptor
  , payloadl  in number
  , payload   in varchar2
) as
  l_message sync_task_type;
  l_dequeue_options dbms_aq.dequeue_options_t;
  l_message_props   dbms_aq.message_properties_t;
  l_message_id      raw(16);

  l_sync_type       varchar2(100);
begin
  dbms_output.put_line('Sync callback triggered at ' || to_char(systimestamp, 'HH24:MI:SS'));

  -- Configure dequeue options
  -- WAIT: Controls how long to wait for messages
  --   • NO_WAIT: Return immediately if no message available
  --   • FOREVER: [default] Wait indefinitely for a message
  --   • Number: Wait specified seconds (0-4294967295)
  l_dequeue_options.wait := dbms_aq.no_wait;

  -- NAVIGATION: Which message to retrieve from queue
  --   • FIRST_MESSAGE: [default] Get first available message
  --   • NEXT_MESSAGE: Get next message in sequence
  --   • FIRST_MESSAGE_MULTI_GROUP: First message across consumer groups
  --   • NEXT_MESSAGE_MULTI_GROUP: Next message across consumer groups
  --l_dequeue_options.navigation := dbms_aq.first_message;

  -- VISIBILITY: When dequeue operation becomes visible to other transactions
  --   • ON_COMMIT: [default] Changes visible only after transaction commit
  --   • IMMEDIATE: Changes visible immediately
  l_dequeue_options.visibility := dbms_aq.on_commit;

  -- DEQUEUE_MODE: What happens to the message after dequeue
  --   • BROWSE: Read message but keep it in queue
  --   • LOCKED: Lock message for exclusive access
  --   • REMOVE: [default] Delete message after reading
  --   • REMOVE_NODATA: Delete message but don't return payload
  l_dequeue_options.dequeue_mode := dbms_aq.remove;

  -- DELIVERY_MODE: How messages are stored and delivered
  --   • PERSISTENT: [default] Messages stored in database tables (durable)
  --   • BUFFERED: Messages kept in memory (faster)
  --   • PERSISTENT_OR_BUFFERED: Use either mode as appropriate
  --l_dequeue_options.delivery_mode := dbms_aq.persistent;

  -- Additional dequeue options available:
  -- l_dequeue_options.consumer_name := 'consumer_name';  -- Target specific consumer in multi-consumer queues
  -- l_dequeue_options.msgid := raw_message_id;           -- Dequeue specific message by its unique ID
  -- l_dequeue_options.correlation := 'correlation_id';   -- Filter messages by correlation identifier
  -- l_dequeue_options.deq_condition := 'priority > 5';   -- SQL WHERE condition to filter which messages to dequeue
  -- l_dequeue_options.transformation := 'transform_name'; -- Apply transformation function to message payload before returning

  -- Available descriptor properties for logging/debugging:
  -- descr.msg_id          - Message ID (RAW)
  -- descr.queue_name      - Queue name where message is located
  -- descr.consumer_name   - Consumer name (for multi-consumer queues)
  -- Note: msg_priority and msg_state are not directly available on descriptor


  logger.log('msg_id: ' || descr.msg_id);
/*   logger.log('queue_name: ' || descr.queue_name);
  logger.log('consumer_name: ' || descr.consumer_name);
  logger.log('payload: ' || payload);
  logger.log('payloadl: ' || payloadl);
   */
  l_dequeue_options.msgid := descr.msg_id;

  -- Dequeue the message
  dbms_aq.dequeue(
      queue_name         => 'AQ_DEMO.SYNC_QUEUE'
    , dequeue_options    => l_dequeue_options
    , message_properties => l_message_props
    , payload            => l_message
    , msgid              => l_message_id
  );
  l_sync_type := l_message.sync_type;

  -- Process the sync task
  insert into sync_operations (source_system, target_system, sync_type, record_count, sync_status)
  values (l_message.source_system, l_message.target_system, l_message.sync_type, l_message.record_count, 'COMPLETED');

  dbms_output.put_line('Sync operation completed: ' || l_message.source_system || ' -> ' || l_message.target_system);
  --commit;
exception
  when others then
    --rollback;
    dbms_output.put_line('Error in sync callback: ' || sqlerrm);

    raise;
end;

6. Regiter the callBack

-- When our queue SYNC_QUEUE will receibe an element,
-- this PLSQL is going to triger
begin
  dbms_aq.register(
      sys.aq$_reg_info_list(
          sys.aq$_reg_info(
              'AQ_DEMO.SYNC_QUEUE'
            , dbms_aq.namespace_aq
            , 'PLSQL://AQ_DEMO.SYNC_CALLBACK'
            , null
          )
      )
    , 1
  );
  dbms_output.put_line('Sync notification registered');
end;
/

8. Start the queue

begin
  -- Start the queue
  dbms_aqadm.start_queue('AQ_DEMO.SYNC_QUEUE');
end;
/

8. Queue Management API

A simple procedure to queue sync tasks:

create or replace procedure queue_sync_task(
    p_source_system in varchar2
  , p_target_system in varchar2
  , p_sync_type     in varchar2
  , p_record_count  in number
  , p_priority      in number default 5
) as
  l_enqueue_options dbms_aq.enqueue_options_t;
  l_message_props   dbms_aq.message_properties_t;
  l_message         sync_task_type;
  l_msgid           raw(16);
begin
  dbms_output.put_line('Queueing sync task: ' || p_source_system || ' -> ' || p_target_system);

  -- Create sync task message
  l_message := sync_task_type(
      source_system => p_source_system,
      target_system => p_target_system,
      sync_type     => p_sync_type,
      record_count  => p_record_count,
      priority      => p_priority
  );

  -- Configure enqueue options
  l_enqueue_options.visibility := dbms_aq.on_commit;

  -- Enqueue the sync task
  dbms_aq.enqueue(
      queue_name         => 'AQ_DEMO.SYNC_QUEUE'
    , enqueue_options    => l_enqueue_options
    , message_properties => l_message_props
    , payload            => l_message
    , msgid              => l_msgid
  );

  commit;
  dbms_output.put_line('Sync task queued successfully');

exception
  when others then
    rollback;
    dbms_output.put_line('Error queueing sync task: ' || sqlerrm);
    raise;
end;

Usage Examples

Queuing Sync Tasks

-- Queue different types of sync operations
begin
    queue_sync_task('CRM_SYSTEM', 'DATA_WAREHOUSE', 'INCREMENTAL', 1250, 3);
    queue_sync_task('ERP_SYSTEM', 'ANALYTICS_DB', 'FULL', 50000, 5);
    queue_sync_task('WEB_APP', 'BACKUP_SYSTEM', 'DELTA', 340, 1);
end;

Monitoring Operations

-- Check completed sync operations
select source_system
      , target_system
      , sync_type
      , record_count
      , sync_status
      , created_on
   from sync_operations
  order by created_on desc;

Queue Status Monitoring

-- Monitor queue status and retry counts
select treat(user_data as sync_task_type).source_system
     , treat(user_data as sync_task_type).target_system
     , enq_time
     , deq_time
     , retry_count
     , msg_state
  from aq$sync_queue_table
 order by enq_time desc;

Benefits of This Approach

1. Reliability

  • Automatic retry mechanisms for failed operations
  • Message persistence ensures no data loss
  • Transaction-safe operations

2. Scalability

  • Asynchronous processing doesn't block source systems
  • Multiple consumers can process messages in parallel, This is a different example, where the multiple_consumers = TRUE and you need to register consumers; this example is only a single consumer.
  • Queue-based architecture handles load spikes

Real-World Applications

This pattern is particularly useful for:

  • ETL Processes: Triggering data warehouse updates
  • Microservices: Event-driven communication
  • Data Replication: Keeping systems in sync
  • Backup Operations: Asynchronous backup triggers
  • Analytics: Real-time data feed processing

Conclusion

Oracle Advanced Queuing provides a robust foundation for building reliable data synchronization systems. The combination of automatic retry mechanisms, message persistence, and flexible configuration makes it an excellent choice for enterprise integration scenarios.

The implementation shown here demonstrates how to create a production-ready sync system that can handle various types of data synchronization tasks while providing comprehensive monitoring and error handling capabilities.


References and Further Reading

Official Oracle Documentation

Tutorials and Examples

Source Code

Troubleshooting

More from this blog

A little knowledge to share-Oracle APEX

17 posts

Soy Ingeniero de Aplicaciones desde el 2012, he trabajado con Oracle APEX desde el 2017en versiones 5, 18, 20, 21. En los ultimos años he desarrollado habilidades en CSS, JavaScript, Jquery y PlSql , I specialize in Oracle APEX (Oracle Application Express )

A Simple Oracle Advanced Queue (DBMS AQ) Example: Automatic Callback #JoelKallmanDay