Use a Reference Feed

How to use a reference data feed to perform temporal lookups to enrich events.

Introduction

Reference feeds are temporal stores of reference data that a translation can look up to enhance an Event with additional data. For example, rather than storing a person’s full name and phone number in every event, we can just store their user id and, based on this value, look up the associated user data and decorate the event. In the description below, we will make use of the GeoHost-V1.0-REFERENCE reference feed defined in separate HOWTO document.

Using a Reference Feed

To use a Reference Feed, one uses the Stroom xslt function stroom:lookup(). This function is found within the xml namespace xmlns:stroom=“stroom”.

The lookup function has two mandatory arguments and three optional as per

  • lookup(String map, String key) Look up a reference data map using the period start time
  • lookup(String map, String key, String time) Look up a reference data map using a specified time, e.g. the event time
  • lookup(String map, String key, String time, Boolean ignoreWarnings) Look up a reference data map using a specified time, e.g. the event time, and ignore any warnings generated by a failed lookup
  • lookup(String map, String key, String time, Boolean ignoreWarnings, Boolean trace) Look up a reference data map using a specified time, e.g. the event time, ignore any warnings generated by a failed lookup and get trace information for the path taken to resolve the lookup.

Let’s say, we have the Event fragment

<Event>
    <EventTime>
      <TimeCreated>2020-01-18T20:39:04.000Z</TimeCreated>
    </EventTime>
    <EventSource>
      <System>
        <Name>LinuxWebServer</Name>
        <Environment>Production</Environment>
      </System>
      <Generator>Apache  HTTPD</Generator>
      <Device>
        <HostName>stroomnode00.strmdev00.org</HostName>
        <IPAddress>192.168.2.245</IPAddress>
      </Device>
      <Client>
        <IPAddress>192.168.4.220</IPAddress>
        <Port>61801</Port>
      </Client>
      <Server>
        <HostName>stroomnode00.strmdev00.org</HostName>
        <Port>443</Port>
      </Server>
    ...
    </EventSource>

then the following XSLT would lookup our GeoHost-V1.0-REFERENCE Reference map to find the FQDN of our client

<xsl:variable name="chost" select="stroom:lookup('IP_TO_FQDN', data[@name = 'clientip']/@value)" /> 

And the XSLT to find the IP Address for our Server would be

<xsl:variable name="sipaddr" select="stroom:lookup('FQDN_TO_IP', data[@name = 'vserver']/@value)"  />

In practice, one would also pass the time element as well as setting ignoreWarnings to true(). i.e.

<xsl:variable name="chost" select="stroom:lookup('IP_TO_FQDN', data[@name = 'clientip']/@value, $formattedDate, true())"  />
...
<xsl:variable name="sipaddr" select="stroom:lookup('FQDN_TO_IP',  data[@name = 'vserver']/@value, $formattedDate, true())"  />

Modifying an Event Feed to use a Reference Feed

We will now modify an Event feed to have it lookup our GeoHost-V1.0-REFERENCE reference maps to add additional information to the event. The feed for this exercise is the Apache-SSL-BlackBox-V2.0-EVENTS event feed which processes Apache HTTPD SSL logs which make use of a variation on the BlackBox log format. We will step through a Raw Event stream and modify the translation directly. This way, we see the changes directly.

Using the Explorer pane’s Quick Filter, entry box, we will find the Apache feed.

images/HOWTOs/v6/UI-UseReferenceFeed-00.png

Stroom UI CreateReferenceFeed - Explorer pane Quick Filter

First, select the Quick Filter text entry box and type Apache (the Quick Filter is case insensitive). At this you will see the Explorer pane system group structure reduce down to just the Event Sources.

images/HOWTOs/v6/UI-UseReferenceFeed-01.png

Stroom UI CreateReferenceFeed - Explorer pane Quick Filter -reduced structure

The Explorer pane will display any resources that match our Apache string. Double clicking on the Apache-SSL-BlackBox-V2.0-EVENTS Feed will select it, and bring up the Feed’s tab in the main window.

images/HOWTOs/v6/UI-UseReferenceFeed-03.png

Stroom UI CreateReferenceFeed - Explorer pane Quick Filter -selected feed displayed

We click on the tab’s Data sub-item and then select the most recent Raw Events stream.

images/HOWTOs/v6/UI-UseReferenceFeed-04.png

Stroom UI CreateReferenceFeed - Select specific raw event stream

Now, select the check box on the Raw Events stream in the Specific Stream (middle) pane.

images/HOWTOs/v6/UI-UseReferenceFeed-05.png

Stroom UI CreateReferenceFeed - Selected stream check box

Note that, when we check the box, we see that the Process, Delete and Download icons ( ) are enabled.

We enter Stepping Mode by pressing the stepping button found at the bottom right corner of the Data/Meta-data pane. You will then be requested to choose a pipeline to step with, with the selection already pre-selected

images/HOWTOs/v6/UI-UseReferenceFeed-07.png

Stroom UI CreateReferenceFeed - Stepping pipeline selection

This auto pre-selection is a simple pattern matching action by Stroom. Press OK to start the stepping which displays the pipeline stepping tab

images/HOWTOs/v6/UI-UseReferenceFeed-08.png

Stroom UI CreateReferenceFeed - Stepping pipeline source display

Select the translationFilter element to reveal the translation we plan to modify.

images/HOWTOs/v6/UI-UseReferenceFeed-09.png

Stroom UI CreateReferenceFeed - Stepping pipeline xslt translation filter selected

To bring up the first event from the stream, press the Step Forward button to show

images/HOWTOs/v6/UI-UseReferenceFeed-10.png

Stroom UI CreateReferenceFeed - Stepping pipeline - first event

We scroll the translation pane to show the XSLT segment that deals with the and elements

images/HOWTOs/v6/UI-UseReferenceFeed-11.png

Stroom UI CreateReferenceFeed - Stepping pipeline Client/Server code

and also scroll the translation output pane to display the and elements

images/HOWTOs/v6/UI-UseReferenceFeed-12.png

Stroom UI CreateReferenceFeed - Stepping pipeline translation output pane

We modify the Client xslt segment to change

    <Client>
        <IPAddress>
            <xsl:value-of select="data[@name =  'clientip']/@value"  />
        </IPAddress>
        <Port>
            <xsl:value-of select="data[@name =  'clientport']/@value"  />
        </Port>
    </Client>

to

    <Client>
        <xsl:variable name="chost" select="stroom:lookup('IP_TO_FQDN', data[@name = 'clientip']/@value)" />
        <xsl:if  test="$chost">"
            <HostName>
                <xsl:value-of  select="$chost" />
            </HostName>
        </xsl:if>
            <IPAddress>
                <xsl:value-of select="data[@name =  'clientip']/@value"  />
            </IPAddress>
        <xsl:if test="data[@name =  'clientport']/@value !='-'">
            <Port>
                <xsl:value-of select="data[@name =  'clientport']/@value"  />
            </Port>
        </xsl:if>
    </Client>

and then we press the Refresh Current Step icon .

BUT NOTHING CHANGES !!!

Not quite, you will note in the top right of the translation pane some yellow boxes.

images/HOWTOs/v6/UI-UseReferenceFeed-13.png

Stroom UI CreateReferenceFeed - Stepping pipeline warning

If you click on the top square box, you will see the WARN: 1 selection window

images/HOWTOs/v6/UI-UseReferenceFeed-14.png

Stroom UI CreateReferenceFeed - Stepping pipeline WARN:1

Clicking on the yellow rectangle box below the yellow square box, the translation pane will automatically scroll back to the top of the translation and show the icon.

images/HOWTOs/v6/UI-UseReferenceFeed-16.png

Stroom UI CreateReferenceFeed - Stepping pipeline Warning

Clicking on the icon will reveal the actual warning message.

images/HOWTOs/v6/UI-UseReferenceFeed-17.png

Stroom UI CreateReferenceFeed - Stepping pipeline Warning message

The problem is that, the pipeline cannot find the Reference. To allow a pipeline to find reference feeds, we need to modify the translation parameters within the pipeline. The pipeline for this Event feed is called APACHE-SSLBlack-Box-V2.0-EVENTS. Open this pipeline by double clicking on its entry in the Explorer window

images/HOWTOs/v6/UI-UseReferenceFeed-18.png

Stroom UI CreateReferenceFeed - Launch Pipeline

then switch to the Structure sub-item

images/HOWTOs/v6/UI-UseReferenceFeed-19.png

Stroom UI CreateReferenceFeed - Pipeline display structure

and then select the translationFilter element to reveal

images/HOWTOs/v6/UI-UseReferenceFeed-20.png

Stroom UI CreateReferenceFeed - Pipeline translationFilter structure

The top pane shows the pipeline, in this case, the selected translation filter element of the pipeline. The middle pane shows the Properties for this element - we see that it has an xslt property of the APACHE-BlackBoxV2.0-EVENTS translation. The bottom pane is the one we are interested in. In the case of translation Filters, this pane allows one to associate Reference streams with the translation Filter.

images/HOWTOs/v6/UI-UseReferenceFeed-21.png

Stroom UI CreateReferenceFeed - New Reference selection

So, to associate our GeoHost-V1.0-REFERENCE reference feed with this translation filter, click on the New Reference icon to bring up the New Pipeline Reference selection window

images/HOWTOs/v6/UI-UseReferenceFeed-22.png

Stroom UI CreateReferenceFeed - New Pipeline Reference

For Pipeline: use the menu selector and choose the Reference Loader pipeline and then press OK

images/HOWTOs/v6/UI-UseReferenceFeed-23.png

Stroom UI CreateReferenceFeed - Choose Pipeline

For Feed:, navigate to the reference feed we want, that is the GeoHost-V1.0-REFERENCE reference feed and press OK

images/HOWTOs/v6/UI-UseReferenceFeed-24.png

Stroom UI CreateReferenceFeed - Pipeline translationFilter Feed

And finally, for Stream Type: choose Reference from the drop-down menu

images/HOWTOs/v6/UI-UseReferenceFeed-25.png

Stroom UI CreateReferenceFeed - Pipeline translationFilter Stream Type

then press OK to save the new reference. We now see

images/HOWTOs/v6/UI-UseReferenceFeed-26.png

Stroom UI CreateReferenceFeed - Pipeline translationFilter - Configured

Save these pipeline changes by pressing the icon in the top left then switch back to the APACHE-SSLBlackBox-V2.0-EVENTS stepping tab.

Pressing the Refresh Current Step icon will remove the warning and we now note that the output pane now shows the <Client/HostName> element.

images/HOWTOs/v6/UI-UseReferenceFeed-27.png

Stroom UI CreateReferenceFeed - output pane with Client/HostName element

To complete the translation, we will add reference lookups for the <Server/HostName> element and we will also add <Location> elements to both the <Client> and <Server> elements.

The completed code segment looks like

    ...

    <!-- Set some variables to enable lookup functionality  -->
    <xsl:variable name="formattedDate" select="stroom:format-date(data[@name =  'time']/@value, 'dd/MMM/yyyy:HH:mm:ss XX')" />

    <!--  For Version 2.0 of Apache  audit we  have the virtual  server,  so this  will be our server -->
    <xsl:variable name="vServer" select="data[@name  =  'vserver']/@value"  />
    <xsl:variable name="vServerPort" select="data[@name =  'vserverport']/@value"  />

    ...
 
    <!-- -->
    <Client>
    <!--  See if we  can get the client  HostName from  the given IP address -->
    <xsl:variable name="chost" select="stroom:lookup('IP_TO_FQDN',data[@name  =  'host']/@value, $formattedDate, true())"  />
        <xsl:if  test="$chost">
        <HostName>
            <xsl:value-of  select="$chost" />
        </HostName>
        </xsl:if>
        <IPAddress>
            <xsl:value-of select="data[@name =  'clientip']/@value"  />
        </IPAddress>
        <xsl:if test="data[@name =  'clientport']/@value !='-'">
        <Port>
            <xsl:value-of select="data[@name =  'clientport']/@value"  />
        </Port>
        </xsl:if>

    <!--  See if we  can get the client  Location for the client  FQDN if we  have it -->
    <xsl:variable name="cloc" select="stroom:lookup('FQDN_TO_LOC', $chost,  $formattedDate, true())"  />
        <xsl:if  test="$chost != '' and $cloc">
        <xsl:copy-of select="$cloc"  />
        </xsl:if>
    </Client>

    <!-- -->
    <Server>
        <HostName>
            <xsl:value-of  select="$vServer" />
        </HostName>

    <!--  See if we  can get  the  service  IPAddress -->
    <xsl:variable name="sipaddr" select="stroom:lookup('FQDN_TO_IP',$vServer, $formattedDate,  true())"  />
        <xsl:if  test="$sipaddr">
        <IPAddress>
            <xsl:value-of  select="$sipaddr" />
        </IPAddress>
        </xsl:if>

    <!--  Server Port Number   -->
        <xsl:if test="$vServerPort !='-'">
        <Port>
            <xsl:value-of  select="$vServerPort" />
        </Port>
        </xsl:if>

    <!--  See if we  can get the Server location -->
    <xsl:variable name="sloc"  select="stroom:lookup('FQDN_TO_LOC', $vServer, $formattedDate, true())"  />
        <xsl:if  test="$sloc">
            <xsl:copy-of select="$sloc"  />
        </xsl:if>
    </Server>

Once the above modifications have been made to the XSLT, save these by pressing the icon in the top left corner of the pane.

Note the use of the fourth Boolean ignoreWarnings argument in the lookups. We set this to true() as we may not always have the item in the reference map we want and Warnings consume space in the Stroom store file system.

Thus, the fragment from the output pane for our first event shows

images/HOWTOs/v6/UI-UseReferenceFeed-28.png

Stroom UI CreateReferenceFeed - output pane - first event

and the fragment from the output pane for our last event of this stream shows

images/HOWTOs/v6/UI-UseReferenceFeed-29.png

Stroom UI CreateReferenceFeed - output pane - last event

This is the XSLT Translation.

<?xml version="1.0" encoding="UTF-8" ?>
<xsl:stylesheet xpath-default-namespace="records:2" xmlns="event-logging:3" xmlns:stroom="stroom" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xs="http://www.w3.org/2001/XMLSchema" version="3.0">

  <!-- Ingest the records tree -->
  <xsl:template match="records">
    <Events xsi:schemaLocation="event-logging:3 file://event-logging-v3.2.3.xsd" Version="3.2.3">
        <xsl:apply-templates />
    </Events>
  </xsl:template>

    <!-- Only generate events if we have an url on input -->
    <xsl:template match="record[data[@name = 'url']]">
        <Event>
            <xsl:apply-templates select="." mode="eventTime" />
            <xsl:apply-templates select="." mode="eventSource" />
            <xsl:apply-templates select="." mode="eventDetail" />
        </Event>
    </xsl:template>


    <xsl:template match="node()"  mode="eventTime">
        <EventTime>
            <TimeCreated>
              <xsl:value-of select="stroom:format-date(data[@name = 'time']/@value, 'dd/MMM/yyyy:HH:mm:ss XX')" />
            </TimeCreated>
        </EventTime>
    </xsl:template>

    <xsl:template match="node()"  mode="eventSource">
      <!-- Set some variables to enable lookup functionality  -->
      <xsl:variable name="formattedDate" select="stroom:format-date(data[@name =  'time']/@value, 'dd/MMM/yyyy:HH:mm:ss XX')" />
      <!--  For Version 2.0 of Apache  audit we  have the virtual  server,  so this  will be our server -->
      <xsl:variable name="vServer" select="data[@name  =  'vserver']/@value"  />
      <xsl:variable name="vServerPort" select="data[@name =  'vserverport']/@value"  />
        <EventSource>
            <System>
              <Name>
                <xsl:value-of select="stroom:feed-attribute('System')"  />
              </Name>
              <Environment>
                <xsl:value-of select="stroom:feed-attribute('Environment')"  />
              </Environment>
            </System>
            <Generator>Apache  HTTPD</Generator>
            <Device>
              <HostName>
                <xsl:value-of select="stroom:feed-attribute('MyHost')"  />
              </HostName>
              <IPAddress>
                <xsl:value-of select="stroom:feed-attribute('MyIPAddress')"  />
              </IPAddress>
            </Device>
            <Client>
              <xsl:variable name="chost" select="stroom:lookup('IP_TO_FQDN', data[@name = 'clientip']/@value, $formattedDate, true())" />
              <xsl:if  test="$chost">
                <HostName>
                    <xsl:value-of  select="$chost" />
                </HostName>
              </xsl:if>
                <IPAddress>
                    <xsl:value-of select="data[@name =  'clientip']/@value"  />
                </IPAddress>
              <xsl:if test="data[@name =  'clientport']/@value !='-'">
                <Port>
                    <xsl:value-of select="data[@name =  'clientport']/@value"  />
                </Port>
              </xsl:if>
              <xsl:variable name="cloc" select="stroom:lookup('FQDN_TO_LOC', $chost,  $formattedDate, true())"  />
              <xsl:if  test="$chost != '' and $cloc">
                <xsl:copy-of select="$cloc"  />
              </xsl:if>
            </Client>
            <Server>
                <HostName>
                    <xsl:value-of  select="$vServer" />
                </HostName>
            <!--  See if we  can get  the  service  IPAddress -->
            <xsl:variable name="sipaddr" select="stroom:lookup('FQDN_TO_IP',$vServer, $formattedDate,  true())"  />
            <xsl:if  test="$sipaddr">
                <IPAddress>
                    <xsl:value-of  select="$sipaddr" />
                </IPAddress>
            </xsl:if>
            <!--  Server Port Number   -->
            <xsl:if test="$vServerPort !='-'">
                <Port>
                    <xsl:value-of  select="$vServerPort" />
                </Port>
            </xsl:if>
            <!--  See if we  can get the Server location -->
            <xsl:variable name="sloc"  select="stroom:lookup('FQDN_TO_LOC', $vServer, $formattedDate, true())"  />
            <xsl:if  test="$sloc">
                <xsl:copy-of select="$sloc"  />
            </xsl:if>
            </Server>
            <User>
              <Id>
                <xsl:value-of select="data[@name='user']/@value" />
              </Id>
            </User>
        </EventSource>
    </xsl:template>

    <xsl:template match="node()"  mode="eventDetail">
        <EventDetail>
          <TypeId>SendToWebService</TypeId>
          <Description>Send/Access data to Web Service</Description>
          <Classification>
            <Text>UNCLASSIFIED</Text>
          </Classification>
          <Send>
            <Source>
              <Device>
                <IPAddress>
                    <xsl:value-of select="data[@name = 'clientip']/@value"/>
                </IPAddress>
                <Port>
                    <xsl:value-of select="data[@name = 'vserverport']/@value"/>
                </Port>
              </Device>
            </Source>
            <Destination>
              <Device>
                <HostName>
                    <xsl:value-of select="data[@name = 'vserver']/@value"/>
                </HostName>
                <Port>
                    <xsl:value-of select="data[@name = 'vserverport']/@value"/>
                </Port>
              </Device>
            </Destination>
            <Payload>
              <Resource>
                <URL>
                    <xsl:value-of select="data[@name = 'url']/@value"/>
                </URL>
                <Referrer>
                    <xsl:value-of select="data[@name = 'referer']/@value"/>
                </Referrer>
                <HTTPMethod>
                    <xsl:value-of select="data[@name = 'url']/data[@name = 'httpMethod']/@value"/>
                </HTTPMethod>
                <HTTPVersion>
                    <xsl:value-of select="data[@name = 'url']/data[@name = 'version']/@value"/>
                </HTTPVersion>
                <UserAgent>
                    <xsl:value-of select="data[@name = 'userAgent']/@value"/>
                </UserAgent>
                <InboundSize>
                    <xsl:value-of select="data[@name = 'bytesIn']/@value"/>
                </InboundSize>
                <OutboundSize>
                    <xsl:value-of select="data[@name = 'bytesOut']/@value"/>
                </OutboundSize>
                <OutboundContentSize>
                    <xsl:value-of select="data[@name = 'bytesOutContent']/@value"/>
                </OutboundContentSize>
                <RequestTime>
                    <xsl:value-of select="data[@name = 'timeM']/@value"/>
                </RequestTime>
                <ConnectionStatus>
                    <xsl:value-of select="data[@name = 'constatus']/@value"/>
                </ConnectionStatus>
                <InitialResponseCode>
                    <xsl:value-of select="data[@name = 'responseB']/@value"/>
                </InitialResponseCode>
                <ResponseCode>
                    <xsl:value-of select="data[@name = 'response']/@value"/>
                </ResponseCode>
                <Data Name="Protocol">
                  <xsl:attribute select="data[@name = 'url']/data[@name = 'protocol']/@value" name="Value"/>
                </Data>
              </Resource>
            </Payload>
            <!-- Normally our translation at this point would contain an <Outcome> attribute.
            Since all our sample data includes only successful outcomes we have ommitted the <Outcome> attribute 
            in the translation to minimise complexity-->
          </Send>
        </EventDetail>
    </xsl:template>
</xsl:stylesheet>

Apache BlackBox with Lookups Translation XSLT ( Download ApacheHPPTDwithLookups-TranslationXSLT.xslt )

Troubleshooting lookup issues

If your lookup is not working as expected you can use the 5th argument of the lookup function to help investigate the issue.

If we return to the element of the pipeline and change the xslt from

    <Client>
        <xsl:variable name="chost" select="stroom:lookup('IP_TO_FQDN', data[@name = 'clientip']/@value, $formattedDate, true())" />

to

    <Client>
        <xsl:variable name="chost" select="stroom:lookup('IP_TO_FQDN', data[@name = 'clientip']/@value, $formattedDate, true(), true())" />

and then we press the Refresh Current Step icon

images/HOWTOs/v6/UI-UseReferenceFeed-30.png

Stroom UI CreateReferenceFeed - lookup 5th argument

you will notice the two blue squares at the top right of the code pane

images/HOWTOs/v6/UI-UseReferenceFeed-31.png

Stroom UI CreateReferenceFeed - lookup 5th argument

If you click on the lower blue square then the code screen will reposition to the beginning of the xslt. Note the icon at the top left of the screen. If you hover over this information icon you will see information about the path taken to resolve the lookup. Hopefully this additional information guides resolution of the lookup issue.

images/HOWTOs/v6/UI-UseReferenceFeed-32.png

Stroom UI CreateReferenceFeed - lookup trace information

Once you have completed your troubleshooting you can either remove the 5th argument from the lookup function, or set to false.

Last modified November 26, 2024: Merge branch '7.5' into 7.6 (5fb86ee)