Pipeline Processing

Creating pipelines to process and transform data.

Pipelines control how data is processed in Stroom. Typically you’re going to want to do a lot of the same stuff for every pipeline, i.e. similar transformations, indexing, writing out data. You can actually create a template pipeline and inherit from it, tweaking what you need to for this or that feed. We’re not doing that now because we want to show how to create one from scratch.

Create a pipeline

  1. Create a pipeline by right-clicking our Stroom 101 folder and selecting:

    New
    Pipeline
  2. Call it something like CSV to XML pipeline.

  3. Select Structure from the top of the new tab. This is the most important view for the pipeline because it shows what will actually happen on the pipeline.

We already have a Source element. Unlike most other pipeline elements this isn’t something we need to configure. It’s just there to show the starting point. Data gets into the pipeline via other means - we’ll describe this in detail later.

Add a data splitter

Data splitters are powerful, and there is a lot we can say about them. Here we’re just going to make a basic one.

Create a CSV splitter

We have CSV data in the following form:

id,guid,from_ip,to_ip,application
1,10990cde-1084-4006-aaf3-7fe52b62ce06,159.161.108.105,217.151.32.69,Tres-Zap
2,633aa1a8-04ff-442d-ad9a-03ce9166a63a,210.14.34.58,133.136.48.23,Sub-Ex

To process this we need to know if there’s a header row, and what the delimiters are. This is a job for a Data Splitter.

The splitter is actually a type of Text Converter , so lets create one of those:

  1. Right click on our Stroom 101 folder and selecting:

    New
    Text Converter
  2. Call it something like CSV splitter.

In the new tab you need to tell the Text Converter that it’ll be a Data Splitter:

Click the Settings sub-tab then select Data Splitter in the Converter Type drop-down.

images/quick-start-guide/process/configure-csvSplitter-type.png

Configuring the data splitter

Now go to the Conversion tab. What you need to put in here is specific to the built-in Data Splitter functionality, so I’m just going to tell you what you’re going to need:

<?xml version="1.1" encoding="UTF-8"?>
<dataSplitter
    xmlns="data-splitter:3"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="data-splitter:3 file://data-splitter-v3.0.xsd"
    version="3.0">

  <!-- The first line contains the field names -->
  <split delimiter="\n" maxMatch="1">
    <group>
      <split delimiter="," containerStart="&#34;" containerEnd="&#34;">
        <var id="heading" />
      </split>
    </group>
  </split>

  <!-- All subsequent lines are records -->
  <split delimiter="\n">
    <group>
      <split delimiter="," containerStart="&#34;" containerEnd="&#34;">
        <data name="$heading$1" value="$1" />
      </split>
    </group>
  </split>
</dataSplitter>

This guide assumes you are running the stroom_core_test stack which has the data_splitter-v3.0.xsd schema pre-loaded.

Save it by clicking the save button .

So we now have a configured, re-usable data splitter for CSV files that have headers. We need to add this to our pipeline as a filter, so head back to the pipeline’s Structure section and add a DSParser.

  1. Right-click the Source element and select:

    Add
    Parser
    DSParser
  2. Call it CSV Parser and click OK.

We need to tell the new CSV parser to use the TextConverter (CSV splitter) we created earlier.

  1. Click on the CSV Parser element and the pane below will show it’s properties.
  2. Double click the textConverter property and change Value to our CSV splitter entity.
images/quick-start-guide/process/configuring-dsSplitter.png

Configuring the CSV splitter

Now save the pipeline by clicking the add button .

Test the csv splitter

So now we have CSV data in Stroom and a pipeline that is configured to process CSV data. We’ve done a fair few things so far and are we sure the pipeline is correctly configured? We can do some debugging and find out.

In Stroom you can step through you records and see what the output is at each stage. It’s easy to start doing this. The first thing to do is to open your CSV_FEED feed, click on the stream in the top pane then click the big blue stepping button at the bottom right of the bottom data pane.

You’ll be asked to select a pipeline:

images/quick-start-guide/process/configure-debug.png

Selecting a pipeline to step through

Now you get a view that’s similar to your feed view, except it also shows the pipeline. The Stepper allows you to step through each record in the source data, where a record is defined by your Data Splitter parser. The Stepper will highlight the currently selected record/event.

images/quick-start-guide/process/debug-source.png

Stepping - source data

The Stepper also has stepping controls to allow you to move forward/backward through the source data. Click the green step forward button .

You should see the highlighted section advance to the next record/event.

images/quick-start-guide/process/stepping-01.png

Stepping through the CSV data

Click on the CSV parser element. You will now see the stepping view for this element that is split into three panes:

  • Top pane - this shows the content of your CSV parser element, i.e. the TextConverter (CSV splitter) XML. This can be used to modify your TextConverter.
  • Bottom left pane - this shows the input to the pipeline element.
  • Bottom right pane - this shows the output from the pipeline element. The output from the Data Splitter is XML in records format. You can see the schema for records in the XML schemas folder.
images/quick-start-guide/process/stepping-02.png

The output from a working data splitter

If there are any errors then you will see an error icon in the gutter of the top pane. In the example below, an invalid XML element has been added to the Data Splitter content to demonstrate an error occurring.

images/quick-start-guide/process/stepping-error.png

Add XSLT to transform records format XML into something else

XSLT is the language used to transform record/event data from one form into another in Stroom pipelines. An XSLTFilter pipeline element takes XML input and uses an XSLT to transform it into different XML or some other text format.

Create the XSLT filter

This process is very similar to creating the CSV splitter:

  1. Create the XSLT filter
  2. Add it to the pipeline
  3. Step through to make sure it’s doing what we expect

To create the new XSLT entity do the following:

  1. Right click the Stroom 101 folder in the Explorer Tree and select:

    New
    XSL Translation
  2. Name it XSLT.

  3. Click OK.

This will open a new tab for the XSLT entity.

On the new tab ensure the XSLT sub-tab is selected. This is another text editor pane but this one accepts XSLT. This XSLT will be very basic and just takes the record data from the split filter and puts it into fields. The XSLT for this is below but if you’d like to tinker then go ahead.

<?xml version="1.1" encoding="UTF-8" ?>
<xsl:stylesheet
    xpath-default-namespace="records:2"
    xmlns:stroom="stroom"
    xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    version="2.0">

  <xsl:template match="records">
    <Events>
      <xsl:apply-templates />
    </Events>
  </xsl:template>
  <xsl:template match="record">
    <xsl:variable name="id" select="data[@name='id']/@value" />
    <xsl:variable name="guid" select="data[@name='guid']/@value" />
    <xsl:variable name="from_ip" select="data[@name='from_ip']/@value" />
    <xsl:variable name="to_ip" select="data[@name='to_ip']/@value" />
    <xsl:variable name="application" select="data[@name='application']/@value" />

    <Event>
      <Id><xsl:value-of select="$id" /></Id>
      <Guid><xsl:value-of select="$guid" /></Guid>
      <FromIp><xsl:value-of select="$from_ip" /></FromIp>
      <ToIp><xsl:value-of select="$to_ip" /></ToIp>
      <Application><xsl:value-of select="$application" /></Application>
    </Event>
  </xsl:template>
</xsl:stylesheet>

Make sure you save it by clicking the save button .

Go back to the Structure sub-tab of the pipeline and add an XSLTFilter element downstream of the CSV parser element. Call it something like XSLT filter.

Select the XSLT filter element and configure it to use the actual XSLT you just created by double-clicking xslt in the properties pane at the bottom:

In the dialog make sure you select the XSLT filter in the Stroom 101 folder. Save the pipeline.

Test the XSLT filter

We’re going to test this in the same way we tested the CSV splitter, by clicking the large stepping button on the feed data pane. Click the step forward button a few times to make sure it’s working then click on the XSLT element. This time you should see the XSLT filter there too, as well as the basic XML being transformed into more useful XML:

images/quick-start-guide/process/stepping-03.png

Stepping the XSLT filter

There’s a few more things to get this pipeline ready for doing this task for real. We need to get this data to a destination.

Outputting the transformed data

The XSLT filter doesn’t actually write XML but instead it just outputs XML events to the next element in the pipeline. In order to write these XML events out to a destination you need a writer. If your transofmration is producing XML then you need an XMLWriter , if it is producing JSON then you need a JSONWriter and for plain text you need a TextWriter .

Our XSLT filter element is outputting XML so we will create an XMLWriter.

Create the XML writer

You don’t need to create one outside the pipeline (in the way you did with the CSV splitter and the XSLT filter). Just do the following:

  1. Right click on the XSLT filter element and select:

    Add
    Writer
    XMLWriter
  2. Name it XML writer.

  3. Click OK.

That’s it, no other configuration necessary.

Create the destination

We need to do something with the serialised XML. We’ll write it to a Stream . To do this we create a StreamAppender :

  1. Right click on the XML Writer element and select:

    Add
    Destination
    StreamAppender
  2. Name it Stream appender.

  3. Click OK.

Streams only exist within feeds and have a type. We could set the feed that the stream will be written into but by default the StreamAppender will write to the same Feed as the input stream. We must however set the type of the Stream to distinguish it from the Raw Events Stream that we POSTed to Stroom.

To set the Stream Type do the following:

  1. Click on the Stream appender pipeline element and the pane below will show it’s properties.
  2. Double click the streamType property and change Value to the Events stream type.

Test the destination

We can test the XML writer and the streamAppender using the same stepping feature. Make sure you’ve saved the pipeline and set a new stepping session running. If you click on the stream appender you’ll see something like this:

images/quick-start-guide/process/stepping-05.png

The final output from the pipeline

Set the pipeline running

Obviously you don’t want to step through your data one by one. This all needs automation, and this is what Processors and Processor Filters are for. The processor works in the background to take any unprocessed streams (as determined by the Processor Filter and its Tracker ) and process them through the pipeline. So far everything on our EXAMPLE_IN feed is unprocessed.

Create a processor and filter

Processors are created from the Processors sub-tab of the pipeline.

Click the add button and you will be presented with a Filter Expression Tree . To configure the filter do the following:

  1. Right click on the root AND operator and click Add Term. A new expression is added to the tree as a child of the operator and it has three dropdowns in it ( Field , Condition and value).
  2. Create an expression term for the Feed:
    1. Field: Feed
    2. Condition: is
    3. Value: CSV_FEED
  3. Create an expression term for the Stream Type:
    1. Field: Type
    2. Condition: =
    3. Value: Raw Events

You only need to set the incoming feed and the stream types:

images/quick-start-guide/process/configure-processor.png

Configure the new processor filter

You will now see the newly created processor and its filter.

images/quick-start-guide/process/show-processors.png

The new processor and filter

Ensure that both the processor and its filter are enabled by clicking the checkbox at the left of the row. This is it, everything we’ve done is about to start working on its own, just like it would in a real configuration.

If you keep refreshing this table it will show you the processing status which should change after a few seconds to show that the data you have uploaded is being or has been processed. The fields in the filter row will have been updated to reflect the new position of the Filter Tracker. Once this has happened you should be able to open the destination feed CSV_FEED and see the output data (or errors if there were any). If the CSV_FEED tab was already open then you will likely need to click refresh on the top pane.

images/quick-start-guide/process/show-output.png

The output of the pipeline

You can see that there are the Raw Events and the processed Events. If you click on the Events then you can see all the XML that we’ve produced.

Now you’ve processed your data you can go ahead and index it.

Last modified November 26, 2024: Merge branch '7.5' into 7.6 (5fb86ee)