Thursday, April 13, 2017

Triggers, Jobs, Events, and Queues - Part 2

In my previous post, I talked about the problem of triggers that do things outside of the current transaction.  My example was using the HR sample schema.  The problem: send the department manager an e-mail whenever a new employee is added to the manager's department, or an existing employee is transferred.  The first impulse is to write a trigger on the EMPLOYEES table, but this doesn't do what we want.  Either we get a mutating table error because we are trying to SELECT from the table that we are in the middle of updating.  Or if we make it run a procedure in an AUTONOMOUS TRANSACTION, the e-mail gets sent even if we roll back the changes.

So my first solution was to use DBMS_JOB to start a job to send the e-mail.  Since the job runs in a separate session, it doesn't get the mutating table error.  And since the job is not submitted until the current transaction is committed, the manager does not get an e-mail unless the changes are saved. But there is another way.


Method 2:  Advanced Queuing and DBMS_SCHEDULER

While the DBMS_JOB method is simpler, and I honestly would use that method for the given use case, there are two problems with it.  First of all, DBMS_SCHEDULER is preferred over DBMS_JOB for scheduling jobs, and I would not be surprised if Oracle were to deprecate DBMS_JOB someday.  But DBMS_SCHEDULER runs immediately, and does not depend on a COMMIT, so we can't use it directly.  Secondly, DBMS_JOB would be inconvenient for more complex  requirements, for instance to conditionally trigger a whole chain of actions on a table update.  DBMS_SCHEDULER is great for more complicated requirements like this.  How can we use it to schedule work to be done upon saving changes to a table, since it doesn't require a COMMIT?

The answer is to combine DBMS_SCHEDULER with Oracle Streams Advanced Queuing (AQ).  With AQ, your table trigger can raise an event by enqueuing a message, and the message is not enqueued without a COMMIT.  Besides being able to schedule jobs based on time, DBMS_SCHEDULER can also schedule jobs that run when an event is raised.
So first, let's create a queue to hold the information that needs to be passed to the employee_dept_chg_email procedure.  To do this, we need to grant privileges to some users.  I like to have a user whose purpose is to create and administer queues, then have this user grant privileges on the queues to other users, like our table owner, HR.

CONNECT / AS SYSDBA
CREATE USER aq_admin IDENTIFIED BY aq_admin DEFAULT TABLESPACE users;
GRANT CONNECT TO aq_admin;
GRANT CREATE TYPE TO aq_admin;
GRANT aq_administrator_role TO aq_admin;
ALTER USER aq_admin QUOTA UNLIMITED ON users;
GRANT aq_user_role TO hr;
GRANT EXECUTE ON dbms_aq TO hr;
GRANT CREATE JOB to HR;

Then as AQ_ADMIN, I create a TYPE to hold the information that needs to be passed to the procedure, a queue table with the type as its payload, and a queue based on the table:

CREATE OR REPLACE TYPE employee_dept_chg_type AS OBJECT (
  department_id NUMBER(4),
  first_name    VARCHAR2(20),
  last_name     VARCHAR2(25),
  job_id        VARCHAR2(10)
);
/
/* Multiple consumers must be true to use this queue to start jobs. */
EXECUTE DBMS_AQADM.create_queue_table ( -
   queue_table            =>  'aq_admin.employee_dept_chg_tab', -
   queue_payload_type     =>  'aq_admin.employee_dept_chg_type',
   multiple_consumers     =>  TRUE
 );

EXECUTE DBMS_AQADM.create_queue ( -
   queue_name            =>  'aq_admin.employee_dept_chg_queue', -
   queue_table           =>  'aq_admin.employee_dept_chg_tab');

EXECUTE DBMS_AQADM.start_queue ( -
   queue_name         => 'aq_admin.employee_dept_chg_queue', -
   enqueue            => TRUE);

Finally, I grant privileges on the type and the queue to HR plus roles and privileges needed for DBMS_AQ and DBMS_SCHEDULER:
GRANT EXECUTE ON employee_dept_chg_type TO hr;
EXECUTE DBMS_AQADM.grant_queue_privilege ( -
   privilege     =>     'ALL', -
   queue_name    =>     'aq_admin.employee_dept_chg_queue', -
   grantee       =>     'hr', -
   grant_option  =>      FALSE);

Now HR can rewrite the trigger to enqueue a message:

CREATE OR REPLACE TRIGGER employees_manager_change_email
  AFTER INSERT OR UPDATE OF department_id
  ON employees
  FOR EACH ROW
DECLARE
  employee_dept_chg_msg aq_admin.employee_dept_chg_type;
  l_enqueue_options     DBMS_AQ.enqueue_options_t;
  l_message_properties  DBMS_AQ.message_properties_t;
  l_message_handle      RAW(16);
BEGIN
  IF :NEW.department_id IS NOT NULL AND
     (INSERTING OR
      (UPDATING AND :NEW.department_id <> NVL(:OLD.department_id,0))) THEN
    employee_dept_chg_msg := new aq_admin.employee_dept_chg_type(
      :new.department_id,
      :new.first_name,
      :new.last_name,
      :new.job_id
      );
    DBMS_AQ.ENQUEUE (queue_name         => 'aq_admin.employee_dept_chg_queue',
                     payload            => employee_dept_chg_msg,
                     enqueue_options    => l_enqueue_options,
                     message_properties => l_message_properties,
                     msgid              => l_message_handle
                     );
  END IF;
END;

Enqueuing a message requires a COMMIT, so if the transaction in which the trigger is fired is rolled back, the message is not enqueued.  This is what we want – no e-mail sent unless the change to the employee's manager is committed.
We need to change the procedure a bit to use the message type as a parameter instead of the individual parameters.

CREATE OR REPLACE PROCEDURE employee_dept_chg_email (
  msg  IN aq_admin.employee_dept_chg_type
)
IS
  manager_first_name  employees.first_name%TYPE;
  manager_last_name   employees.last_name%TYPE;
  manager_email       employees.email%TYPE;
BEGIN
  SELECT   first_name, last_name, email
    INTO   manager_first_name, manager_last_name, manager_email
    FROM   employees
   WHERE   employee_id = (SELECT   manager_id
                            FROM   departments
                           WHERE   department_id = msg.department_id);

  send_mail (
    p_from      => 'HR',
    p_to        => manager_email,
    p_subject   => 'New Employee in your Department',
    p_text      =>    'Please welcome '
                   || msg.first_name
                   || ' '
                   || msg.last_name
                   || ' to Department '
                   || TO_CHAR (msg.department_id)
                   || ' as a '
                   || msg.job_id
                   || '.'
  );
END employee_dept_chg_email;
/

Now we create the DBMS_SCHEDULER job that will be run whenever a message is placed in the queue.  This is done in several parts.  We create a schedule, then a program to reference the procedure that sends the e-mail, and then the job itself referencing the schedule and program.  The reason to create these separately is the program is going to get its parameter to pass to the procedure from the payload in the queued message.  And we also want to set an attribute on the job to tell Oracle that each message should start a new instance of the job, because the default is that if a job is already running, SCHEDULER shouldn’t start the job again.

BEGIN
  DBMS_SCHEDULER.create_event_schedule (
    schedule_name   => 'hr.employee_dept_chg_schedule',
    start_date      => SYSTIMESTAMP,
    queue_spec      => 'aq_admin.employee_dept_chg_queue'
  );
  DBMS_SCHEDULER.create_program (
    program_name          => 'hr.employee_dept_chg_pgm',
    program_type          => 'STORED_PROCEDURE',
    program_action        => 'hr.employee_dept_chg_email',
    number_of_arguments   => 1,
    enabled               => FALSE,
    comments              => 'Sends an e-mail to notify department manager of new employee.'
  );
  /* This says to get the first argument for the procedure from the event
   * message. */
  DBMS_SCHEDULER.define_metadata_argument (
    program_name         => 'hr.employee_dept_chg_pgm',
    argument_position    => 1,
    metadata_attribute   => 'EVENT_MESSAGE'
  );

  DBMS_SCHEDULER.enable (name => 'hr.employee_dept_chg_pgm');
  DBMS_SCHEDULER.create_job (
    job_name        => 'hr.employee_dept_chg_job',
    program_name    => 'hr.employee_dept_chg_pgm',
    schedule_name   => 'hr.employee_dept_chg_schedule',
    enabled         => FALSE
  );
  /* The parallel_instances attribute says that this job should start a new
   * instance for each message on the queue, even if an instance is already
   * running. */
  DBMS_SCHEDULER.set_attribute ('hr.employee_dept_chg_job',
                                'parallel_instances',
                                TRUE);
  DBMS_SCHEDULER.enable (name => 'hr.employee_dept_chg_job');
END;
/

As you can see, this is quite a bit more involved than simply using DBMS_JOB.  But perhaps you are doing something more involved than simply sending an e-mail?  If so, AQ and SCHEDULER are a very powerful combination to get things to happen automatically when the database changes, but ONLY when the change is saved.

No comments:

Post a Comment