Pipeline Recipies

A set of basic pipeline structure recipies for common use cases.

The following are a basic set of pipeline recipes for doing typical tasks in Stroom. Is it not an exhaustive list as the possibilities with Pipelines are vast. They are intended as a rough guide to get you started with building Pipelines.

Data Ingest and Transformation

CSV to Normalised XML

  1. CSV data is ingested.
  2. The Data Splitter parser parses the records and fields into records format XML using an XML based TextConverter document.
  3. The first XSLTFilter is used to normalise the events in records XML into event-logging XML.
  4. The second XSLTFilter is used to decorate the events with additional data, e.g. <UserDetails> using reference data lookups.
  5. The SchemaFilter ensures that the XML output by the stages of XSLT transformation conforms to the event-logging XMLSchema.
  6. The XML events are then written out as an Event Stream to the Stream store.

Configured Content

  • Data Splitter - A TextConverter containing XML conforming to data-splitter:3.
  • Normalise - An XSLT transforming records:2 => event-logging:3.
  • Decorate - An XSLT transforming event-logging:3 => event-logging:3.
  • SchemaFilter - XMLSchema event-logging:3

JSON to Normalised XML

The same as ingesting CSV data above, except the input JSON is converted into an XML representation of the JSON by the JSONParser. The Normalise XSLTFilter will be specific to the format of the JSON being ingested. The Decorate) XSLTFilter will likely be identical to that used for the CSV ingest above, demonstrating reuse of pipeline element content.

Configured Content

  • Normalise - An XSLT transforming http://www.w3.org/2013/XSL/json => event-logging:3.
  • Decorate - An XSLT transforming event-logging:3 => event-logging:3.
  • SchemaFilter - XMLSchema event-logging:3

XML (not event-logging) to Normalised XML

As above except that the input data is already XML, though not in event-logging format. The XMLParser simply reads the XML character data and converts it to XML SAX events for processing. The Normalise XSLTFilter will be specific to the format of this XML and will transform it into event-logging format.

Configured Content

  • Normalise - An XSLT transforming a 3rd party schema => event-logging:3.
  • Decorate - An XSLT transforming event-logging:3 => event-logging:3.
  • SchemaFilter - XMLSchema event-logging:3

XML (event-logging) to Normalised XML

As above except that the input data is already in event-logging XML format, so no normalisation is required. Decoration is still needed though.

Configured Content

XML Fragments to Normalised XML

XML Fragments are where the input data looks like:

<Event>
  ...
</Event>
<Event>
  ...
</Event>

In other words, it is technically badly formed XML as it has no root element or declaration. This format is however easier for client systems to send as they can send multiple <Event> blocks in one stream (e.g. just appending them together in a rolled log file) but don’t need to wrap them with an outer <Events> element.

The XMLFragmentParser understands this format and will add the wrapping element to make well-formed XML. If the XML fragments are already in event-logging format then no Normalise XSLTFilter is required.

Configured Content

  • XMLFragParser - Content similar to:
    <?xml version="1.1" encoding="utf-8"?>
    <!DOCTYPE Records [
    <!ENTITY fragment SYSTEM "fragment">]>
    <Events
        xmlns="event-logging:3"
        xmlns:stroom="stroom"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="event-logging:3 file://event-logging-v3.4.2.xsd"
        Version="3.4.2">
    &fragment;
    </Events>
    
  • Decorate - An XSLT transforming event-logging:3 => event-logging:3.
  • SchemaFilter - XMLSchema event-logging:3

Handling Malformed Data

Cleaning Malformed XML data

In some cases client systems may send XML containing characters that are not supported by the XML standard. These can be removed using the InvalidXMLCharFilterReader .

The input data may also be known to contain other sets of characters that will cause problems in processing. The FindReplaceFilter can be used to remove/replace either a fixed string or a Regex pattern.

[Pipeline truncated]

Raw Streaming

In cases where you want to export the raw (or cooked) data from a feed you can have a very simply pipeline to pipe the source data directly to an appender. This may be so that the raw data can be ingested into another system for analysis. In this case the data is being written to disk using a file appender.

Indexing

XML to Stroom Lucene Index

This use case is for indexing XML event data that had already been normalised using one of the ingest pipelines above. The XSLTFilter is used to transform the event into records format, extracting the fields to be indexed from the event. The IndexingFilter reads the records XML and loads each one into Stroom’s internal Lucene index .

Configured Content

The records:2 XML looks something like this, with each <data> element representing an indexed field value.

<?xml version="1.1" encoding="UTF-8"?>
<records 
    xmlns="records:2"
    xmlns:stroom="stroom"
    xmlns:sm="stroom-meta"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="records:2 file://records-v2.0.xsd"
    version="2.0">
  <record>
    <data name="StreamId" value="1997" />
    <data name="EventId" value="1" />
    <data name="Feed" value="MY_FEED" />
    <data name="EventTime" value="2010-01-01T00:00:00.000Z" />
    <data name="System" value="MySystem" />
    <data name="Generator" value="CSV" />
    <data name="IPAddress" value="1.1.1.1" />
    <data name="UserId" analyzer="KEYWORD" value="user1" />
    <data name="Action" value="Authenticate" />
    <data name="Description" value="Some message 1" />
  </record>
</records>

XML to Stroom Lucene Index (Dynamic)

Dynamic indexing in Stroom allows you to use the XSLT to define the fields that are being indexed and how each field should be indexed. This avoids having to define all the fields up front in the Index and allows for the creation of fields based on the actual data received. The only difference with normal indexing in Stroom is that is uses the DynamicIndexingFilter and rather than transforming the event into records:2 XML, it is transformed into index-documents:1 XML as shown in the example below.

<?xml version="1.1" encoding="UTF-8"?>
<index-documents
    xmlns="index-documents:1"
    xmlns:stroom="stroom"
    xmlns:sm="stroom-meta"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="index-documents:1 file://index-documents-v1.0.xsd"
    version="1.0">
  <document>
    <field><name>StreamId</name><type>Id</type><indexed>true</indexed><stored>true</stored><value>1997</value></field>
    <field><name>EventId</name><type>Id</type><indexed>true</indexed><stored>true</stored><value>1</value></field>
    <field><name>Feed</name><type>Text</type><analyser>Alpha numeric</analyser><indexed>true</indexed><value>MY_FEED</value></field>
    <field><name>EventTime</name><type>Date</type><indexed>true</indexed><value>2010-01-01T00:00:00.000Z</value></field>
    <field><name>System</name><type>Text</type><analyser>Alpha numeric</analyser><indexed>true</indexed><value>MySystem</value></field>
    <field><name>Generator</name><type>Text</type><analyser>Alpha numeric</analyser><indexed>true</indexed><value>CSV</value></field>
    <field><name>IPAddress</name><type>Text</type><indexed>true</indexed><value>1.1.1.1</value></field>
    <field><name>UserId</name><type>Text</type><indexed>true</indexed><value>user1</value></field>
    <field><name>Action</name><type>Text</type><analyser>Alpha numeric</analyser><indexed>true</indexed><value>Authenticate</value></field>
    <field><name>Description</name><type>Text</type><analyser>Alpha numeric</analyser><indexed>true</indexed><value>Some message 1</value></field>
  </document>
</index-documents>

Configured Content

XML to an Elastic Search Index

This use case is for indexing XML event data that had already been normalised using one of the ingest pipelines above. The XSLTFilter is used to transform the event into records format, extracting the fields to be indexed from the event. The ElasticIndexingFilter reads the records XML and loads each one into an external Elasticsearch index .

Configured Content

Search Extraction

Search extraction is the process of combining the data held in the index with data obtained from the original indexed document, i.e. the event. Search extraction is useful when you do not want to store the whole of an event in the index (to reduce storage used) but still want to be able to access all the event data in a Dashboard/View. An extraction pipeline is required to combine data in this way. Search extraction pipelines are referenced in Dashboard and View settings.

Standard Lucene Index Extraction

This is a non-dynamic search extraction pipeline for a Lucene index.

Configured Content

  • XSLTFilter - An XSLT transforming event-logging:3 => records:2.

Dynamic Lucene Index Extraction

This is a dynamic search extraction pipeline for a Lucene index.

Configured Content

  • XSLTFilter - An XSLT transforming event-logging:3 => index-documents:1.

Data Egress

XML to CSV File

An recipe of writing normalised XML events (as produced by an ingest pipeline above) to a file, but in a flat file format like CSV. The XSLTFilter transforms the events XML into CSV data with XSLT including this:

<xsl:output method="text" omit-xml-declaration="yes" indent="no"/>

The TextWriter converts the XML character events into a stream of characters encoded using the desired output character encoding. The data is appended to a file on a file system, with one file per Stream.

Configured Content

XML to JSON Rolling File

This is similar to the above recipe for writing out CSV, except that the XSLTFilter converts the event XML into XML conforming to the https://www.w3.org/2013/XSL/json/ XMLSchema. The JSONWriter can read this format of XML and convert it into JSON using the desired character encoding. The RollingFileAppender will append the encoded JSON character data to a file on the file system that is rolled based on a size/time threshold.

Configured Content

  • XSLTFilter - An XSLT transforming event-logging:3 => http://www.w3.org/2013/XSL/json.
  • SchemaFilter - XMLSchema http://www.w3.org/2013/XSL/json.

XML to HTTP Destination

This recipe is for sending normalised XML events to another system over HTTP. The HTTPAppender is configured with the URL and any TLS certificates/keys/credentials.

Reference Data

Reference Loader

A typical pipeline for loading XML reference data (conforming to the reference-data:2 XMLSchema) into the reference data store. The ReferenceDataFilter reads the reference-data:2 format data and loads each entry into the appropriate map in the store.

As an example, the reference-data:2 XML for mapping userIDs to staff numbers looks something like this:

<?xml version="1.1" encoding="UTF-8"?>
<referenceData
    xmlns="reference-data:2"
    xmlns:evt="event-logging:3"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="reference-data:2 file://reference-data-v2.0.1.xsd"
    version="2.0.1">
  <reference>
    <map>USER_ID_TO_STAFF_NO_MAP</map>
    <key>user1</key>
    <value>staff1</value>
  </reference>
  <reference>
    <map>USER_ID_TO_STAFF_NO_MAP</map>
    <key>user2</key>
    <value>staff2</value>
  </reference>
  ...
</referenceData>

Statistics

This recipe converts normalised XML data and converts it into statistic events (confirming to the statistics:4 XMLSchema). Stroom’s Statistic Stores are a way to store aggregated counts or averaged values over time periods. For example you may want counts of certain types of event, aggregated over fixed time buckets. Each XML event is transformed using the XSLTFilter to either return no output or a statistic event. An example of statistics:4 data for two statistic events is:

<?xml version="1.1" encoding="UTF-8"?>
<statistics
    xmlns="statistics:2"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="statistics:2 file://statistics-v2.0.xsd">
  <statistic>
    <time>2023-12-22T00:00:00.000Z</time>
    <count>1</count>
    <tags>
      <tag name="user" value="user1" />
    </tags>
  </statistic>
  <statistic>
    <time>2023-12-23T00:00:00.000Z</time>
    <count>5</count>
    <tags>
      <tag name="user" value="user6" />
    </tags>
  </statistic>
</statistics>

Configured Content

Last modified November 26, 2024: Merge branch '7.5' into 7.6 (5fb86ee)