再次查询视图可以确定消息确实没被移除:
SELECT user_data
FROM aq$demo_queue_table;
USER_DATA(MESSAGE)
------------------------------------------------------------
DEMO_QUEUE_PAYLOAD_TYPE('Here is a message')
4、出列消息(dequeuing messages)
现在我们将实际出列消息。该操作不要求在同一会话进行(记住入列是AQ基于表的提交事务)。像入列,出列也是一个事务(从队列表
移除消息)。
DECLARE r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T; r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T; v_message_handle RAW(16); o_payload demo_queue_payload_type; BEGIN DBMS_AQ.DEQUEUE( queue_name => 'demo_queue', dequeue_options => r_dequeue_options, message_properties => r_message_properties, payload => o_payload, msgid => v_message_handle ); DBMS_OUTPUT.PUT_LINE( '*** Dequeued message is [' || o_payload.message || '] ***' ); COMMIT; END; / *** Dequeued message is [Here is a message] *** PL/SQL procedure successfully completed.
再次查询视图发现消息确已出列:
SELECT COUNT(*) FROM aq$demo_queue_table; COUNT(*) ---------- 0
5、通知(notification)
文章的剩余部分,我们将看一下通过通知自动出列。通过这种方式无论消息何时入列, Oracle都将通知一个代理执行一个注册的PLSQL
"回调"(callback)过程(可选择地,代理还可以通知一个邮箱地址或HTTP://地址)。
为了说明,我们将创建和注册一个PLSQL过程以通过通知方式管理我们的出列。这个回调过程将出列消息并写到一个数据库表,以模拟
标准数据库操作。
BEGIN DBMS_AQADM.STOP_QUEUE( queue_name => 'demo_queue' ); DBMS_AQADM.DROP_QUEUE( queue_name => 'demo_queue' ); DBMS_AQADM.DROP_QUEUE_TABLE( queue_table => 'demo_queue_table' ); END; /
现在我们重新创建队列表以允许多个消费者(consumers)。一个消费者是一个出列消息代理(agent)启用多个消费者是自动通知实现的
前提条件。
BEGIN DBMS_AQADM.CREATE_QUEUE_TABLE ( queue_table => 'demo_queue_table', queue_payload_type => 'demo_queue_payload_type', multiple_consumers => TRUE ); END; /
接着重新创建并启动我们的队列。
BEGIN DBMS_AQADM.CREATE_QUEUE ( queue_name => 'demo_queue', queue_table => 'demo_queue_table' ); DBMS_AQADM.START_QUEUE ( queue_name => 'demo_queue' ); END; /
为了证明通知的异步特点,我们将把出列消息存在一个应用表中。
CREATE TABLE demo_queue_message_table
( message VARCHAR2(4000) );
现在我们有一个应用表,我们可以创建回调PL/SQL。这个过程将出列触发了通知的入列消息。程序参数必须命名并类型化。入列消息将
包含入列时间戳,这样插入到应用表中我们将看到消息入列和通知出列的异步延迟。
CREATE PROCEDURE demo_queue_callback_procedure( context RAW, reginfo SYS.AQ$_REG_INFO, descr SYS.AQ$_DESCRIPTOR, payload RAW, payloadl NUMBER ) AS r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T; r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T; v_message_handle RAW(16); o_payload demo_queue_payload_type; BEGIN r_dequeue_options.msgid := descr.msg_id; r_dequeue_options.consumer_name := descr.consumer_name; DBMS_AQ.DEQUEUE( queue_name => descr.queue_name, dequeue_options => r_dequeue_options, message_properties => r_message_properties, payload => o_payload, msgid => v_message_handle ); INSERT INTO demo_queue_message_table ( message ) VALUES ( 'Message [' || o_payload.message || '] ' || 'dequeued at [' || TO_CHAR( SYSTIMESTAMP,