Pipeline

From ESM Wiki
Jump to: navigation, search

Contents

Introduction

Pipeline is a root abstraction for ESM logic. Pipeline brings engines alive. Its main part is a sequence of engines that are executed in turns one after another. For each request received by MPP pipeline does the following:

  • Receives processing control in some predefined extension points of MPP core logic.
  • Executes a sequence of engines transfering data from one engine to another.
  • Does additional postprocessing specific to a pipeline type.
  • Returns processing control to MPP core logic.

Configuration

Pipeline is configured with one or more MPP policy group options. What options are available depends on a specific pipeline type. For a list of available pipelines see here. One common element for all pipelines is an option that specifies sequence of engines that a pipeline executes. Name of an option is different for different pipelines but its value sytax is common:

engines = engine_full_id | 1*(space | ","); space- or comma-separated list of full engine ID's

For example stip body pipeline engines may be specified as following:

<mppd>
    <groups>
        <group id="default">
            ...
            <strip_body_pipeline>
                salesforce_http.login
                salesforce_http.find_opportunity
                salesforce_http.attach_attachment_to_opportunity
                salesforce_http.create_task_for_opportunity
            </strip_body_pipeline>
            ...
        </group>
    </groups>
</mppd>

If an engine is not listed in any enabled pipeline it will not be loaded and executed.

Postprocessor

Postprocessor comes into play after all engines in a pipeline are done. The job of a postprocessor is to perform additional pipeline-specific processing that is normally not allowed for engines. There may be multiple postprocessors for a pipeline. Available postprocessors and their configuration depends on a pipeline type. In this article common classes of postprocessors are described. They are classified by type of processing they perform.

Decider

Decider is a postprocessor that makes some kind of decision important for further processing in the context of MPP core logic. Decider translates results of engines in a pipeline to MPP core logic results. For example WBL pipeline has a result-to-condition translator that translates engine results to WBL condition. MPP core logic will then take appropriative processing depending on WBL condition.

Replacer

Replacer is a postprocessor that replaces request properties with results of engines in a pipeline. For example strip body pipeline has a body part replacer that allows to replace email body parts with results of engines in the pipeline.

Back inserter

Back inserter is a postprocessor that inserts new request properties after existing properties. New properties are constructed from results of engines in a pipeline. For example strip body pipeline has a headers back inserter that allows to insert email headers at the end of existing headers.

Precondition chaining

Precondition chaining is a principle according to which if a result of an engine doesn't get calculated due to a precondition then all consequent engines in a pipeline that use this result in their preconditions or actions also don't get their actions executed and results calculated. Effectively this principle leads to formation of chains of connected engines which execution will be preconditioned with a precondition for first engine in a chain. No need to repeat a precondition (potentially complex) in consequent connected engines. For example:

<mppd>
    <groups>
        <group id=”default”>
            <strip_body_enabled>yes</strip_body_enable>
            <strip_body_engines>
                select_body_part
                db.save_body_part
                db.save_body_part_recipient
            </strip_body_engines>
        </group>
    </groups>
    <engines>
        <boorex id=”select_body_part”>
            <i id=”by_file_name”>\.gif$</i>
            <result id=”is_match”>
                <case>
                    <condition>
                             $body_part.malformed $EQ no
                        $AND $body_part.multipart $EQ no
                        $AND $body_part.file_name ~= by_file_name
                    </condition>
                    <result>yes</result>
                <case>
                <result>no</result>
            </result>
        </boorex>
        <mysql id=”db”>
            <query id=”save_body_part”>
                <precondition>$engines.select_body_part.is_match $EQ yes</precondition>
                    <template>
                        INSERT INTO `body_part` (`data`)
                        VALUES ('${escape $body_part}')
                    </template>
                    <result id=”insert_id”>
                        <if_empty_table><result>$insert_id</result></if_empty_table>
                        <if_empty_table><result>$empty</result></if_empty_table>
                    </result>
             </query>
				
             <query id=”save_body_part_recipient”>
                 <template>
                     INSERT INTO `body_part_recipient` (`body_part_id`, `address`)
                     VALUES ($engines.db.save_body_part.insert_id, '${escape $recipient}')
                 </template>
             </query>
        </mysql>
    </engines>
</mppd>

In this XML there are three engines that are included into <strip_body_engines>: select_body_part, db.save_body_part and db.save_body_part_recipient. First engine checks if a body part matches a condition: not malformed and not multipart and file name matches specified regular expression. First engine sets result of check into variable $engines.select_body_part.is_match. Second engine checks in precondition if corresponding result of previous engine is equal to “yes” and if so it executes its action and calculates its result with result macro $engines.save_body_part.insert_id. Third engine uses the result of second engine in its action. However if this result wouldn't get calculated due to precondition in second engines, third engine will skip its action. Third engine skips its action because of precondition chaining.

Errors processing

When an error occur during pipeline engines execution or <throw> option is in effect for result with validation then execution of a pipeline is terminated. Control is passed to exception engines if they are supported and defined for a pipeline. The job of exception engines is to cleanup and take appropriative error actions. Special macro $exception may be used by exception engines to refer to error message. Exception engines may use results of pipeline engines. However if result remains undefined an engine that uses it will be skipped. After exception engines are finished a pipeline is finished normally. If an error occure during exception engines execution or an exception engine rethrows with <throw> option then pipeline is finished with error condition. Postprocessors are not executed. Control is passed to MPP core logic that takes appropriative action depending on pipeline finish status. What action MPP core logic takes depends on pipeline type.

If exception engines neither supported nor defined then when error occur during pipeline engines execution or <throw> option is in effect then execution of a pipeline is just terminated with error condition. Then everything goes the same way.

For example consider the following config:

<mppd>
    <groups>
        <group id="default">
            <actions>
                <on_archive_failure>reject</on_archive_failure>
                <on_archive_success>discard</on_archive_success>
            </actions>
            <archive_engines>mysql.insert_entry, http.make_query, mysql.update_entry</archive_engines>
            <archive_exception_engines>mysql.update_entry_with_error</archive_exception_engines>
        </group>
    </groups>
    <engines>
        ...
    </engines>
</mppd>

There is an archive pipeline that consists of three pipeline engines and one exception engine (engine definitions are omitted for simplicity). First pipeline engine inserts some entry with progress status into DB, second engine queries HTTP server for a status update for the entry and third engine updates entry status in DB. It is expected that HTTP query may fail more likely then DB query. So when HTTP query failes control is passed to exception engine that updates entry with error status. Pipeline is finished normally and <on_archive_success> action is taken. But if DB query fails or exception engine decides to rethrow then pipeline is finished with error and <on_archive_failure> action is taken.

Personal tools