ueue',
dequeue_options => r_dequeue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);
DBMS_OUTPUT.PUT_LINE(
'*** Browsed message is [' || o_payload.message || '] ***'
);
END;
/
*** Browsed message is [Here is a message] ***
再次查询视图可以确定消息确实没被移除:
SELECT user_data
FROM aq$demo_queue_table;
USER_DATA(MESSAGE)
------------------------------------------------------------
DEMO_QUEUE_PAYLOAD_TYPE('Here is a message')
4、出列消息(dequeuing messages)
现在我们将实际出列消息。该操作不要求在同一会话进行(记住入列是AQ基于表的提交事务)。像入列,出列也是一个事务(从队列表
移除消息)。
DECLARE
r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload demo_queue_payload_type;
BEGIN
DBMS_AQ.DEQUEUE(
queue_name => 'demo_queue',
dequeue_options => r_dequeue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);
DBMS_OUTPUT.PUT_LINE(
'*** Dequeued message is [' || o_payload.message || '] ***'
);
COMMIT;
END;
/
*** Dequeued message is [Here is a message] ***
PL/SQL procedure successfully completed.
再次查询视图发现消息确已出列:
SELECT COUNT(*)
FROM aq$demo_queue_table;
COUNT(*)
----------
0
5、通知(notification)
文章的剩余部分,我们将看一下通过通知自动出列。通过这种方式无论消息何时入列,
Oracle都将通知一个代理执行一个注册的PLSQL
"回调"(callback)过程(可选择地,代理还可以通知一个邮箱地址或HTTP://地址)。
为了说明,我们将创建和注册一个PLSQL过程以通过通知方式管理我们的出列。这个回调过程将出列消息并写到一个数据库表,以模拟
标准数据库操作。
作为开始,我们清理之前创建的对象。
BEGIN
DBMS_AQADM.STOP_QUEUE(
queue_name => 'demo_queue'
);
DBMS_AQADM.DROP_QUEUE(
queue_name => 'demo_queue'
);
DBMS_AQADM.DROP_QUEUE_TABLE(
queue_table => 'demo_queue_table'
);
END;
/
现在我们重新创建队列表以允许多个消费者(consumers)。一个消费者是一个出列消息代理(agent)启用多个消费者是自动通知实现的
前提条件。
BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE (
queue_table => 'demo_queue_table',
queue_payload_type => 'demo_queue_payload_type',
multiple_consumers => TRUE
);
END;
/
接着重新创建并启动我们的队列。
BEGIN
DBMS_AQADM.CREATE_QUEUE (
queue_name => 'demo_queue',
queue_table => 'demo_queue_table'
);
DBMS_AQADM.START_QUEUE (
queue_name => 'demo_queue'
);
END;
/
为了证明通知的异步特点,我们将把出列消息存在一个应用表中。
CREATE TABLE demo_queue_message_table
( message VARCHAR2(4000) );
现在我们有一个应用表,我们可以创建回调PL/SQL。这个过程将出列触发了通知的入列消息。程序参数必须命名并类型化。入列消息将
包含入列时间戳,这样插入到应用表中我们将看到消息入列和通知出列的异步延迟。
CREATE PROCEDURE demo_queue_callback_procedure(
context RAW,
reginfo SYS.AQ$_REG_INFO,
descr SYS.AQ$_DESCRIPTOR,
payload RAW,
payloadl NUMBER
) AS
r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload demo_queue_payload_type;
BEGIN
r_dequeue_options.msgid := descr.msg_id;
r_dequeue_options.consumer_name := descr.consumer_name;
DBMS_AQ.DEQUEUE(
queue_name => descr.queue_name,
dequeue_options => r_dequeue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);
INSERT INTO demo_queue_message_table ( message )
VALUES ( 'Message [' || o_payload.message || '] ' ||
'dequeued at [' || TO_CHAR( SYSTIMESTAMP,