This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Pipelines

Pipelines are the mechanism for processing and transforming ingested data.

Pipelines

Every feed has an associated translation. The translation is used to convert the input text or XML into event logging XML format.

XSLT is used to translate from XML to event logging XML.

1 - Parser

Parsing input data.

The following capabilities are available to parse input data:

  • XML - XML input can be parsed with the XML parser.
  • XML Fragment - Treat input data as an XML fragment, i.e. XML that does not have an XML declaration or root elements.
  • Data Splitter - Delimiter and regular expression based language for turning non XML data into XML (e.g. CSV)

1.1 - Context Data

Context File

Input File:

<?xml version="1.0" encoding="UTF-8"?>
<SomeData>
	<SomeEvent>
			<SomeTime>01/01/2009:12:00:01</SomeTime>
			<SomeAction>OPEN</SomeAction>
			<SomeUser>userone</SomeUser>
			<SomeFile>D:\TranslationKit\example\VerySimple\OpenFileEvents.txt</SomeFile>
	</SomeEvent>
</SomeData>

Context File:

<?xml version="1.0" encoding="UTF-8"?>
<SomeContext>
	<Machine>MyMachine</Machine>
</SomeContext>

Context XSLT:

<?xml version="1.0" encoding="UTF-8" ?>
<xsl:stylesheet
	xmlns="reference-data:2"
	xmlns:evt="event-logging:3"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
	version="2.0">
		
		<xsl:template match="SomeContext">
			<referenceData 
					xsi:schemaLocation="event-logging:3 file://event-logging-v3.0.0.xsd reference-data:2 file://reference-data-v2.0.1.xsd"
					version="2.0.1">
							
					<xsl:apply-templates/>
			</referenceData>
		</xsl:template>

		<xsl:template match="Machine">
			<reference>
					<map>CONTEXT</map>
					<key>Machine</key>
					<value><xsl:value-of select="."/></value>
			</reference>
		</xsl:template>
		
</xsl:stylesheet>

Context XML Translation:

<?xml version="1.0" encoding="UTF-8"?>
<referenceData xmlns:evt="event-logging:3"
								xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
								xmlns="reference-data:2"
								xsi:schemaLocation="event-logging:3 file://event-logging-v3.0.0.xsd reference-data:2 file://reference-data-v2.0.1.xsd"
								version="2.0.1">
		<reference>
			<map>CONTEXT</map>
			<key>Machine</key>
			<value>MyMachine</value>
		</reference>
</referenceData>

Input File:

<?xml version="1.0" encoding="UTF-8"?>
<SomeData>
	<SomeEvent>
			<SomeTime>01/01/2009:12:00:01</SomeTime>
			<SomeAction>OPEN</SomeAction>
			<SomeUser>userone</SomeUser>
			<SomeFile>D:\TranslationKit\example\VerySimple\OpenFileEvents.txt</SomeFile>
	</SomeEvent>
</SomeData>

Main XSLT (Note the use of the context lookup):

<?xml version="1.0" encoding="UTF-8" ?>
<xsl:stylesheet
	xmlns="event-logging:3"
	xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
	version="2.0">
	
    <xsl:template match="SomeData">
        <Events xsi:schemaLocation="event-logging:3 file://event-logging-v3.0.0.xsd" Version="3.0.0">
            <xsl:apply-templates/>
        </Events>
    </xsl:template>
    <xsl:template match="SomeEvent">
        <xsl:if test="SomeAction = 'OPEN'">
            <Event>
                <EventTime>
                        <TimeCreated>
                            <xsl:value-of select="s:format-date(SomeTime, 'dd/MM/yyyy:hh:mm:ss')"/>
                        </TimeCreated>
                </EventTime>
				<EventSource>
					<System>Example</System>
					<Environment>Example</Environment>
					<Generator>Very Simple Provider</Generator>
					<Device>
						<IPAddress>182.80.32.132</IPAddress>
						<Location>
							<Country>UK</Country>
							<Site><xsl:value-of select="s:lookup('CONTEXT', 'Machine')"/></Site>
							<Building>Main</Building>
							<Floor>1</Floor>              
							<Room>1aaa</Room>
						</Location>           
					</Device>
					<User><Id><xsl:value-of select="SomeUser"/></Id></User>
				</EventSource>
				<EventDetail>
					<View>
						<Document>
							<Title>UNKNOWN</Title>
							<File>
								<Path><xsl:value-of select="SomeFile"/></Path>
							</File>
						</Document>
					</View>
				</EventDetail>
            </Event>
        </xsl:if>
    </xsl:template>
</xsl:stylesheet>

Main Output XML:

<?xml version="1.0" encoding="UTF-8"?>
<Events xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
				xmlns="event-logging:3"
				xsi:schemaLocation="event-logging:3 file://event-logging-v3.0.0.xsd"
				Version="3.0.0">
		<Event Id="6:1">
			<EventTime>
					<TimeCreated>2009-01-01T00:00:01.000Z</TimeCreated>
			</EventTime>
			<EventSource>
					<System>Example</System>
					<Environment>Example</Environment>
					<Generator>Very Simple Provider</Generator>
					<Device>
						<IPAddress>182.80.32.132</IPAddress>
						<Location>
								<Country>UK</Country>
								<Site>MyMachine</Site>
								<Building>Main</Building>
								<Floor>1</Floor>
								<Room>1aaa</Room>
						</Location>
					</Device>
					<User>
						<Id>userone</Id>
					</User>
			</EventSource>
			<EventDetail>
					<View>
						<Document>
								<Title>UNKNOWN</Title>
								<File>
									<Path>D:\TranslationKit\example\VerySimple\OpenFileEvents.txt</Path>
								</File>
						</Document>
					</View>
			</EventDetail>
		</Event>
</Events>

1.2 - XML Fragments

Handling XML data without root level elements.

Some input XML data may be missing an XML declaration and root level enclosing elements. This data is not a valid XML document and must be treated as an XML fragment. To use XML fragments the input type for a translation must be set to ‘XML Fragment’. A fragment wrapper must be defined in the XML conversion that tells Stroom what declaration and root elements to place around the XML fragment data.

Here is an example:

<?xml version="1.1" encoding="UTF-8"?>
<!DOCTYPE records [
<!ENTITY fragment SYSTEM "fragment">
]>
<records
  xmlns="records:2"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="records:2 file://records-v2.0.xsd"
  version="2.0">
  &fragment;
</records>

During conversion Stroom replaces the fragment text entity with the input XML fragment data. Note that XML fragments must still be well formed so that they can be parsed correctly.

2 - XSLT Conversion

Using Extensible Stylesheet Language Transformations (XSLT) to transform data.

Once the text file has been converted into Intermediary XML (or the feed is already XML), XSLT is used to translate the XML into event logging XML format.

Event Feeds must be translated into the events schema and Reference into the reference schema. You can browse documentation relating to the schemas within the application.

Here is an example XSLT:

<?xml version="1.0" encoding="UTF-8" ?>
<xsl:stylesheet
    xmlns="event-logging:3"
    xmlns:s="stroom"
    xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    version="2.0">

    <xsl:template match="SomeData">
        <Events
        xsi:schemaLocation="event-logging:3 file://event-logging-v3.0.0.xsd"
        Version="3.0.0">
            <xsl:apply-templates/>
        </Events>
    </xsl:template>
    <xsl:template match="SomeEvent">
        <xsl:variable name="dateTime" select="SomeTime"/>
        <xsl:variable name="formattedDateTime" select="s:format-date($dateTime, 'dd/MM/yyyyhh:mm:ss')"/>

        <xsl:if test="SomeAction = 'OPEN'">
            <Event>
            <EventTime>
                <TimeCreated>
                    <xsl:value-of select="$formattedDateTime"/>
                </TimeCreated>
            </EventTime>
            <EventSource>
                <System>Example</System>
                <Environment>Example</Environment>
                <Generator>Very Simple Provider</Generator>
                <Device>
                    <IPAddress>3.3.3.3</IPAddress>
                </Device>
                <User>
                    <Id><xsl:value-of select="SomeUser"/></Id>
                </User>
            </EventSource>
            <EventDetail>
                <View>
                    <Document>
                        <Title>UNKNOWN</Title>
                        <File>
                        <Path><xsl:value-of select="SomeFile"/></Path>
                        <File>
                    </Document>
                </View>
            </EventDetail>
            </Event>
        </xsl:if>
    </xsl:template>
</xsl:stylesheet>

2.1 - XSLT Functions

Custom XSLT functions available in Stroom.

By including the following namespace:

xmlns:s="stroom"

E.g.

<?xml version="1.0" encoding="UTF-8" ?>
<xsl:stylesheet
    xmlns="event-logging:3"
    xmlns:s="stroom"
    xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    version="2.0">

The following functions are available to aid your translation:

  • bitmap-lookup(String map, String key) - Bitmap based look up against reference data map using the period start time
  • bitmap-lookup(String map, String key, String time) - Bitmap based look up against reference data map using a specified time, e.g. the event time
  • bitmap-lookup(String map, String key, String time, Boolean ignoreWarnings) - Bitmap based look up against reference data map using a specified time, e.g. the event time, and ignore any warnings generated by a failed lookup
  • bitmap-lookup(String map, String key, String time, Boolean ignoreWarnings, Boolean trace) - Bitmap based look up against reference data map using a specified time, e.g. the event time, and ignore any warnings generated by a failed lookup and get trace information for the path taken to resolve the lookup.
  • classification() - The classification of the feed for the data being processed
  • col-from() - The column in the input that the current record begins on (can be 0).
  • col-to() - The column in the input that the current record ends at.
  • current-time() - The current system time
  • current-user() - The current user logged into Stroom (only relevant for interactive use, e.g. search)
  • decode-url(String encodedUrl) - Decode the provided url.
  • dictionary(String name) - Loads the contents of the named dictionary for use within the translation
  • encode-url(String url) - Encode the provided url.
  • feed-attribute(String attributeKey) - NOTE: This function is deprecated, use meta(String key) instead. The value for the supplied feed attributeKey.
  • feed-name() - Name of the feed for the data being processed
  • fetch-json(String url) - Simplistic version of http-call that sends a request to the passed url and converts the JSON response body to XML using json-to-xml. Currently does not support SSL configuration like http-call does.
  • format-date(String date, String pattern) - Format a date that uses the specified pattern using the default time zone
  • format-date(String date, String pattern, String timeZone) - Format a date that uses the specified pattern with the specified time zone
  • format-date(String date, String patternIn, String timeZoneIn, String patternOut, String timeZoneOut) - Parse a date with the specified input pattern and time zone and format the output with the specified output pattern and time zone
  • format-date(String milliseconds) - Format a date that is specified as a number of milliseconds since a standard base time known as “the epoch”, namely January 1, 1970, 00:00:00 GMT
  • get(String key) - Returns the value associated with a key that has been stored in a map using the put() function. The map is in the scope of the current pipeline process so values do not live after the stream has been processed.
  • hash(String value) - Hash a string value using the default SHA-256 algorithm and no salt
  • hash(String value, String algorithm, String salt) - Hash a string value using the specified hashing algorithm and supplied salt value. Supported hashing algorithms include SHA-256, SHA-512, MD5.
  • hex-to-dec(String hex) - Convert hex to dec representation
  • hex-to-oct(String hex) - Convert hex to oct representation
  • host-address(String hostname) - Convert a hostname into an IP address.
  • host-name(String ipAddress) - Convert an IP address into a hostname.
  • http-call(String url, String headers, String mediaType, String data, String clientConfig) - Makes an HTTP(S) request to a remote server.
  • json-to-xml(String json) - Returns an XML representation of the supplied JSON value for use in XPath expressions
  • line-from() - The line in the input that the current record begins on (1 based).
  • line-to() - The line in the input that the current record ends at.
  • link(String url) - Creates a stroom dashboard table link.
  • link(String title, String url) - Creates a stroom dashboard table link.
  • link(String title, String url, String type) - Creates a stroom dashboard table link.
  • log(String severity, String message) - Logs a message to the processing log with the specified severity
  • lookup(String map, String key) - Look up a reference data map using the period start time
  • lookup(String map, String key, String time) - Look up a reference data map using a specified time, e.g. the event time
  • lookup(String map, String key, String time, Boolean ignoreWarnings) - Look up a reference data map using a specified time, e.g. the event time, and ignore any warnings generated by a failed lookup
  • lookup(String map, String key, String time, Boolean ignoreWarnings, Boolean trace) - Look up a reference data map using a specified time, e.g. the event time, ignore any warnings generated by a failed lookup and get trace information for the path taken to resolve the lookup.
  • meta(String key) - Lookup a meta data value for the current stream using the specified key. The key can be Feed, StreamType, CreatedTime, EffectiveTime, Pipeline or any other attribute supplied when the stream was sent to Stroom, e.g. meta(‘System’).
  • meta-keys() - Returns an array of meta keys for the current stream. Each key can then be used to retrieve its corresponding meta value, by calling meta($key).
  • numeric-ip(String ipAddress) - Convert an IP address to a numeric representation for range comparison
  • part-no() - The current part within a multi part aggregated input stream (AKA the substream number) (1 based)
  • parse-uri(String URI) - Returns an XML structure of the URI providing authority, fragment, host, path, port, query, scheme, schemeSpecificPart, and userInfo components if present.
  • pipeline-name() - Get the name of the pipeline currently processing the stream.
  • pointIsInsideXYPolygon(Number xPos, Number yPos, Number[] xPolyData, Number[] yPolyData) - Get the name of the pipeline currently processing the stream.
  • random() - Get a system generated random number between 0 and 1.
  • record-no() - The current record number within the current part (substream) (1 based).
  • search-id() - Get the id of the batch search when a pipeline is processing as part of a batch search
  • source() - Returns an XML structure with the stroom-meta namespace detailing the current source location.
  • source-id() - Get the id of the current input stream that is being processed
  • stream-id() - An alias for source-id included for backward compatibility.
  • pipeline-name() - Name of the current processing pipeline using the XSLT
  • put(String key, String value) - Store a value for use later on in the translation

bitmap-lookup()

The bitmap-lookup() function looks up a bitmap key from reference or context data a value (which can be an XML node set) for each set bit position and adds it to the resultant XML.

bitmap-lookup(String map, String key)
bitmap-lookup(String map, String key, String time)
bitmap-lookup(String map, String key, String time, Boolean ignoreWarnings)
bitmap-lookup(String map, String key, String time, Boolean ignoreWarnings, Boolean trace)
  • map - The name of the reference data map to perform the lookup against.
  • key - The bitmap value to lookup. This can either be represented as a decimal integer (e.g. 14) or as hexadecimal by prefixing with 0x (e.g 0xE).
  • time - Determines which set of reference data was effective at the requested time. If no reference data exists with an effective time before the requested time then the lookup will fail. Time is in the format yyyy-MM-dd'T'HH:mm:ss.SSSXX, e.g. 2010-01-01T00:00:00.000Z.
  • ignoreWarnings - If true, any lookup failures will be ignored, else they will be reported as warnings.
  • trace - If true, additional trace information is output as INFO messages.

If the look up fails no result will be returned.

The key is a bitmap expressed as either a decimal integer or a hexidecimal value, e.g. 14/0xE is 1110 as a binary bitmap. For each bit position that is set, (i.e. has a binary value of 1) a lookup will be performed using that bit position as the key. In this example, positions 1, 2 & 3 are set so a lookup would be performed for these bit positions. The result of each lookup for the bitmap are concatenated together in bit position order, separated by a space.

If ignoreWarnings is true then any lookup failures will be ignored and it will return the value(s) for the bit positions it was able to lookup.

This function can be useful when you have a set of values that can be represented as a bitmap and you need them to be converted back to individual values. For example if you have a set of additive account permissions (e.g Admin, ManageUsers, PerformExport, etc.), each of which is associated with a bit position, then a user’s permissions could be defined as a single decimal/hex bitmap value. Thus a bitmap lookup with this value would return all the permissions held by the user.

For example the reference data store may contain:

Key (Bit position) Value
0 Administrator
1 Manage_Users
2 Perform_Export
3 View_Data
4 Manage_Jobs
5 Delete_Data
6 Manage_Volumes

The following are example lookups using the above reference data:

Lookup Key (decimal) Lookup Key (Hex) Bitmap Result
0 0x0 0000000 -
1 0x1 0000001 Administrator
74 0x4A 1001010 Manage_Users View_Data Manage_Volumes
2 0x2 0000010 Manage_Users
96 0x60 1100000 Delete_Data Manage_Volumes

dictionary()

The dictionary() function gets the contents of the specified dictionary for use during translation. The main use for this function is to allow users to abstract the management of a set of keywords from the XSLT so that it is easier for some users to make quick alterations to a dictionary that is used by some XSLT, without the need for the user to understand the complexities of XSLT.

format-date()

The format-date() function takes a Pattern and optional TimeZone arguments and replaces the parsed contents with an XML standard Date Format. The pattern must be a Java based SimpleDateFormat. If the optional TimeZone argument is present the pattern must not include the time zone pattern tokens (z and Z). A special time zone value of “GMT/BST” can be used to guess the time based on the date (BST during British Summer Time).

E.g. Convert a GMT date time “2009/12/01 12:34:11”

<xsl:value-of select="s:format-date('2009/08/01 12:34:11', 'yyyy/MM/dd HH:mm:ss')"/>

E.g. Convert a GMT or BST date time “2009/08/01 12:34:11”

<xsl:value-of select="s:format-date('2009/08/01 12:34:11', 'yyyy/MM/dd HH:mm:ss', 'GMT/BST')"/>

E.g. Convert a GMT+1:00 date time “2009/08/01 12:34:11”

<xsl:value-of select="s:format-date('2009/08/01 12:34:11', 'yyyy/MM/dd HH:mm:ss', 'GMT+1:00')"/>

E.g. Convert a date time specified as milliseconds since the epoch “1269270011640”

<xsl:value-of select="s:format-date('1269270011640')"/>

Time Zone Must be as per the rules defined in SimpleDateFormat under General Time Zone syntax.

http-call()

Executes an HTTP(S) request to a remote server and returns the response.

http-call(String url, [String headers], [String mediaType], [String data], [String clientConfig])

The arguments are as follows:

  • url - The URL to send the request to.
  • headers - A newline (&#10;) delimited list of HTTP headers to send. Each header is of the form key:value.
  • mediaType - The media (or MIME) type of the request data, e.g. application/json. If not set application/json; charset=utf-8 will be used.
  • data - The data to send. The data type should be consistent with mediaType. Supplying the data argument means a POST request method will be used rather than the default GET.
  • clientConfig - A JSON object containing the configuration for the HTTP client to use, including any SSL configuration.

The function returns the response as XML with namespace stroom-http. The XML includes the body of the response in addition to the status code, success status, message and any headers.

clientConfig

The client can be configured using a JSON object containing various optional configuration items. The following is an example of the client configuration object with all keys populated.

{
  "callTimeout": "PT30S",
  "connectionTimeout": "PT30S",
  "followRedirects": false,
  "followSslRedirects": false,
  "httpProtocols": [
    "http/2",
    "http/1.1"
  ],
  "readTimeout": "PT30S",
  "retryOnConnectionFailure": true,
  "sslConfig": {
    "keyStorePassword": "password",
    "keyStorePath": "/some/path/client.jks",
    "keyStoreType": "JKS",
    "trustStorePassword": "password",
    "trustStorePath": "/some/path/ca.jks",
    "trustStoreType": "JKS",
    "sslProtocol": "TLSv1.2",
    "hostnameVerificationEnabled": false
  },
  "writeTimeout": "PT30S"
}

If you are using two-way SSL then you may need to set the protocol to HTTP/1.1.

  "httpProtocols": [
    "http/1.1"
  ],

Example output

The following is an example of the XML returned from the http-call function:

<response xmlns="stroom-http">
  <successful>true</successful>
  <code>200</code>
  <message>OK</message>
  <headers>
    <header>
      <key>cache-control</key>
      <value>public, max-age=600</value>
    </header>
    <header>
      <key>connection</key>
      <value>keep-alive</value>
    </header>
    <header>
      <key>content-length</key>
      <value>108</value>
    </header>
    <header>
      <key>content-type</key>
      <value>application/json;charset=iso-8859-1</value>
    </header>
    <header>
      <key>date</key>
      <value>Wed, 29 Jun 2022 13:03:38 GMT</value>
    </header>
    <header>
      <key>expires</key>
      <value>Wed, 29 Jun 2022 13:13:38 GMT</value>
    </header>
    <header>
      <key>server</key>
      <value>nginx/1.21.6</value>
    </header>
    <header>
      <key>vary</key>
      <value>Accept-Encoding</value>
    </header>
    <header>
      <key>x-content-type-options</key>
      <value>nosniff</value>
    </header>
    <header>
      <key>x-frame-options</key>
      <value>sameorigin</value>
    </header>
    <header>
      <key>x-xss-protection</key>
      <value>1; mode=block</value>
    </header>
  </headers>
  <body>{"buildDate":"2022-06-29T09:22:41.541886118Z","buildVersion":"SNAPSHOT","upDate":"2022-06-29T11:06:26.869Z"}</body>
</response>

Example usage

This is an example of how to use the function call in your XSLT. It is recommended to place the clientConfig JSON in a Dictionary to make it easier to edit and to avoid having to escape all the quotes.

  ...
  <xsl:template match="record">
    ...
    <!-- Read the client config from a Dictionary into a variable -->
    <xsl:variable name="clientConfig" select="stroom:dictionary('HTTP Client Config')" />
    <!-- Make the HTTP call and store the response in a variable -->
    <xsl:variable name="response" select="stroom:http-call('https://reqbin.com/echo', null, null, null, $clientConfig)" />
    <!-- Apply 'response' templates to the response -->
    <xsl:apply-templates mode="response" select="$response" />
    ...
  </xsl:template>
  
  <xsl:template mode="response" match="http:response">
    <!-- Extract just the body of the response -->
    <val><xsl:value-of select="./http:body/text()" /></val>
  </xsl:template>
  ...

Create a string that represents a hyperlink for display in a dashboard table.

link(url)
link(title, url)
link(title, url, type)

Example

link('http://www.somehost.com/somepath')
> [http://www.somehost.com/somepath](http://www.somehost.com/somepath)
link('Click Here','http://www.somehost.com/somepath')
> [Click Here](http://www.somehost.com/somepath)
link('Click Here','http://www.somehost.com/somepath', 'dialog')
> [Click Here](http://www.somehost.com/somepath){dialog}
link('Click Here','http://www.somehost.com/somepath', 'dialog|Dialog Title')
> [Click Here](http://www.somehost.com/somepath){dialog|Dialog Title}

Type can be one of:

  • dialog : Display the content of the link URL within a stroom popup dialog.
  • tab : Display the content of the link URL within a stroom tab.
  • browser : Display the content of the link URL within a new browser tab.
  • dashboard : Used to launch a stroom dashboard internally with parameters in the URL.

If you wish to override the default title or URL of the target link in either a tab or dialog you can. Both dialog and tab types allow titles to be specified after a |, e.g. dialog|My Title.

log()

The log() function writes a message to the processing log with the specified severity. Severities of INFO, WARN, ERROR and FATAL can be used. Severities of ERROR and FATAL will result in records being omitted from the output if a RecordOutputFilter is used in the pipeline. The counts for RecWarn, RecError will be affected by warnings or errors generated in this way therefore this function is useful for adding business rules to XML output.

E.g. Warn if a SID is not the correct length.

<xsl:if test="string-length($sid) != 7">
  <xsl:value-of select="s:log('WARN', concat($sid, ' is not the correct length'))"/>
</xsl:if>

lookup()

The lookup() function looks up from reference or context data a value (which can be an XML node set) and adds it to the resultant XML.

lookup(String map, String key)
lookup(String map, String key, String time)
lookup(String map, String key, String time, Boolean ignoreWarnings)
lookup(String map, String key, String time, Boolean ignoreWarnings, Boolean trace)
  • map - The name of the reference data map to perform the lookup against.
  • key - The key to lookup. The key can be a simple string, an integer value in a numeric range or a nested lookup key.
  • time - Determines which set of reference data was effective at the requested time. If no reference data exists with an effective time before the requested time then the lookup will fail. Time is in the format yyyy-MM-dd'T'HH:mm:ss.SSSXX, e.g. 2010-01-01T00:00:00.000Z.
  • ignoreWarnings - If true, any lookup failures will be ignored, else they will be reported as warnings.
  • trace - If true, additional trace information is output as INFO messages.

If the look up fails no result will be returned. By testing the result a default value may be output if no result is returned.

E.g. Look up a SID given a PF

<xsl:variable name="pf" select="PFNumber"/>
<xsl:if test="$pf">
   <xsl:variable name="sid" select="s:lookup('PF_TO_SID', $pf, $formattedDateTime)"/>

   <xsl:choose>
      <xsl:when test="$sid">
         <User>
             <Id><xsl:value-of select="$sid"/></Id>
         </User>
      </xsl:when>
      <xsl:otherwise>
         <data name="PFNumber">
            <xsl:attribute name="Value"><xsl:value-of select="$pf"/></xsl:attribute>
         </data>
      </xsl:otherwise>
   </xsl:choose>
</xsl:if>

Range lookups

Reference data entries can either be stored with single string key or a key range that defines a numeric range, e.g 1-100. When a lookup is preformed the passed key is looked up as if it were a normal string key. If that lookup fails Stroom will try to convert the key to an integer (long) value. If it can be converted to an integer than a second lookup will be performed against entries with key ranges to see if there is a key range that includes the requested key.

Range lookups can be used for looking up an IP address where the reference data values are associated with ranges of IP addresses. In this use case, the IP address must first be converted into a numeric value using numeric-ip(), e.g:

stroom:lookup('IP_TO_LOCATION', numeric-ip($ipAddress))

Similarly the reference data must be stored with key ranges whose bounds were created using this function.

Nested Maps

The lookup function allows you to perform chained lookups using nested maps. For example you may have a reference data map called USER_ID_TO_LOCATION that maps user IDs to some location information for that user and a map called USER_ID_TO_MANAGER that maps user IDs to the user ID of their manager. If you wanted to decorate a user’s event with the location of their manager you could use a nested map to achieve the lookup chain. To perform the lookup set the map argument to the list of maps in the lookup chain, separated by a /, e.g. USER_ID_TO_MANAGER/USER_ID_TO_LOCATION.

This will perform a lookup against the first map in the list using the requested key. If a value is found the value will be used as the key in a lookup against the next map. The value from each map lookup is used as the key in the next map all the way down the chain. The value from the last lookup is then returned as the result of the lookup() call. If no value is found at any point in the chain then that results in no value being returned from the function.

In order to use nested map lookups each intermediate map must contain simple string values. The last map in the chain can either contain string values or XML fragment values.

put() and get()

You can put values into a map using the put() function. These values can then be retrieved later using the get() function. Values are stored against a key name so that multiple values can be stored. These functions can be used for many purposes but are most commonly used to count a number of records that meet certain criteria.

The map is in the scope of the current pipeline process so values do not live after the stream has been processed. Also, the map will only contain entries that were put() within the current pipeline process.

An example of how to count records is shown below:

<!-- Get the current record count -->
<xsl:variable name="currentCount" select="number(s:get('count'))" />

<!-- Increment the record count -->
<xsl:variable name="count">
  <xsl:choose>
    <xsl:when test="$currentCount">
      <xsl:value-of select="$currentCount + 1" />
    </xsl:when>
    <xsl:otherwise>
      <xsl:value-of select="1" />
    </xsl:otherwise>
  </xsl:choose>
</xsl:variable>

<!-- Store the count for future retrieval -->
<xsl:value-of select="s:put('count', $count)" />

<!-- Output the new count -->
<data name="Count">
  <xsl:attribute name="Value" select="$count" />
</data>

meta-keys()

When calling this function and assigning the result to a variable, you must specify the variable data type of xs:string* (array of strings).

The following fragment is an example of using meta-keys() to emit all meta values for a given stream, into an Event/Meta element:

<Event>
  <xsl:variable name="metaKeys" select="stroom:meta-keys()" as="xs:string*" />
  <Meta>
    <xsl:for-each select="$metaKeys">
      <string key="{.}"><xsl:value-of select="stroom:meta(.)" /></string>
    </xsl:for-each>
  </Meta>
</Event>

parse-uri()

The parse-uri() function takes a Uniform Resource Identifier (URI) in string form and returns an XML node with a namespace of uri containing the URI’s individual components of authority, fragment, host, path, port, query, scheme, schemeSpecificPart and userInfo. See either RFC 2306: Uniform Resource Identifiers (URI): Generic Syntax or Java’s java.net.URI Class for details regarding the components.

The following xml

<!-- Display and parse the URI contained within the text of the rURI element -->
<xsl:variable name="u" select="s:parseUri(rURI)" />

<URI>
  <xsl:value-of select="rURI" />
</URI>
<URIDetail>
  <xsl:copy-of select="$v"/>
</URIDetail>

given the rURI text contains

   http://foo:bar@w1.superman.com:8080/very/long/path.html?p1=v1&amp;p2=v2#more-details

would provide

<URL>http://foo:bar@w1.superman.com:8080/very/long/path.html?p1=v1&amp;p2=v2#more-details</URL>
<URIDetail>
  <authority xmlns="uri">foo:bar@w1.superman.com:8080</authority>
  <fragment xmlns="uri">more-details</fragment>
  <host xmlns="uri">w1.superman.com</host>
  <path xmlns="uri">/very/long/path.html</path>
  <port xmlns="uri">8080</port>
  <query xmlns="uri">p1=v1&amp;p2=v2</query>
  <scheme xmlns="uri">http</scheme>
  <schemeSpecificPart xmlns="uri">//foo:bar@w1.superman.com:8080/very/long/path.html?p1=v1&amp;p2=v2</schemeSpecificPart>
  <userInfo xmlns="uri">foo:bar</userInfo>
</URIDetail>

pointIsInsideXYPolygon()

Returns true if the specified point is inside the specified polygon. Useful for determining if a user is inside a physical zone based on their location and the boundary of that zone.

pointIsInsideXYPolygon(Number xPos, Number yPos, Number[] xPolyData, Number[] yPolyData)

Arguments:

  • xPos - The X value of the point to be tested.
  • yPos - The Y value of the point to be tested.
  • xPolyData - A sequence of X values that define the polygon.
  • yPolyData - A sequence of Y values that define the polygon.

The list of values supplied for xPolyData must correspond with the list of values supplied for yPolyData. The points that define the polygon must be provided in order, i.e. starting from one point on the polygon and then traveling round the path of the polygon until it gets back to the beginning.

2.2 - XSLT Includes

Using an XSLT import to include XSLT from another translation.

You can use an XSLT import to include XSLT from another translation. E.g.

<xsl:import href="ApacheAccessCommon" />

This would include the XSLT from the ApacheAccessCommon translation.

3 - File Output

Substitution variables for use in output file names and paths.

When outputting files with Stroom, the output file names and paths can include various substitution variables to form the file and path names.

Context Variables

The following replacement variables are specific to the current processing context.

  • ${feed} - The name of the feed that the stream being processed belongs to
  • ${pipeline} - The name of the pipeline that is producing output
  • ${sourceId} - The id of the input data being processed
  • ${partNo} - The part number of the input data being processed where data is in aggregated batches
  • ${searchId} - The id of the batch search being performed. This is only available during a batch search
  • ${node} - The name of the node producing the output

Time Variables

The following replacement variables can be used to include aspects of the current time in UTC.

  • ${year} - Year in 4 digit form, e.g. 2000
  • ${month} - Month of the year padded to 2 digits
  • ${day} - Day of the month padded to 2 digits
  • ${hour} - Hour padded to 2 digits using 24 hour clock, e.g. 22
  • ${minute} - Minute padded to 2 digits
  • ${second} - Second padded to 2 digits
  • ${millis} - Milliseconds padded to 3 digits
  • ${ms} - Milliseconds since the epoch

System (Environment) Variables

System variables (environment variables) can also be used, e.g. ${TMP}.

File Name References

rolledFileName in RollingFileAppender can use references to the fileName to incorporate parts of the non rolled file name.

  • ${fileName} - The complete file name
  • ${fileStem} - Part of the file name before the file extension, i.e. everything before the last ‘.’
  • ${fileExtension} - The extension part of the file name, i.e. everything after the last ‘.’

Other Variables

  • ${uuid} - A randomly generated UUID to guarantee unique file names

4 - Element Reference

A reference for all the pipeline elements.

Reader

Reader elements read and transform the data at the character level before they are parsed into a structured form.

BOMRemovalFilterInput

stream.svg BOMRemovalFilterInput  

Removes the Byte Order Mark (if present) from the stream.

BadTextXMLFilterReader

stream.svg BadTextXMLFilterReader  

TODO - Add description

Element properties:

Name Description Default Value
tags A comma separated list of XML elements between which non-escaped characters will be escaped. -

FindReplaceFilter

stream.svg FindReplaceFilter  

Replaces strings or regexes with new strings.

Element properties:

Name Description Default Value
bufferSize The number of characters to buffer when matching the regex. 1000
dotAll Let ‘.’ match all characters in a regex. false
escapeFind Whether or not to escape find pattern or text. true
escapeReplacement Whether or not to escape replacement text. true
find The text or regex pattern to find and replace. -
maxReplacements The maximum number of times to try and replace text. There is no limit by default. -
regex Whether the pattern should be treated as a literal or a regex. false
replacement The replacement text. -
showReplacementCount Show total replacement count true

InvalidCharFilterReader

stream.svg InvalidCharFilterReader  

TODO - Add description

Element properties:

Name Description Default Value
xmlVersion XML version, e.g. 1.0 or 1.1 1.1

InvalidXMLCharFilterReader

stream.svg InvalidXMLCharFilterReader  

Strips out any characters that are not within the standard XML character set.

Element properties:

Name Description Default Value
xmlVersion XML version, e.g. 1.0 or 1.1 1.1

Reader

stream.svg Reader  

TODO - Add description

Parser

Parser elements parse raw text data that conforms to some kind of structure (e.g. XML, JSON, CSV) into XML events (elements, attributes, text, etc) that can be further validated or transformed using. The choice of Parser will be dictated by the structure of the data. Parsers read the data using the character encoding defined on the feed.

CombinedParser

text.svg CombinedParser  

The original general-purpose reader/parser that covers all source data types but provides less flexibility than the source format-specific parsers such as dsParser.

Element properties:

Name Description Default Value
fixInvalidChars Fix invalid XML characters from the input stream. false
namePattern A name pattern to load a text converter dynamically. -
suppressDocumentNotFoundWarnings If the text converter cannot be found to match the name pattern suppress warnings. false
textConverter The text converter configuration that should be used to parse the input data. -
type The parser type, e.g. ‘JSON’, ‘XML’, ‘Data Splitter’. -

DSParser

text.svg DSParser  

A parser for data that uses Data Splitter code.

Element properties:

Name Description Default Value
namePattern A name pattern to load a data splitter dynamically. -
suppressDocumentNotFoundWarnings If the data splitter cannot be found to match the name pattern suppress warnings. false
textConverter The data splitter configuration that should be used to parse the input data. -

JSONParser

json.svg JSONParser  

A built-in parser for JSON source data in JSON fragment format into an XML document.

Element properties:

Name Description Default Value
addRootObject Add a root map element. true
allowBackslashEscapingAnyCharacter Feature that can be enabled to accept quoting of all character using backslash quoting mechanism: if not enabled, only characters that are explicitly listed by JSON specification can be thus escaped (see JSON spec for small list of these characters) false
allowComments Feature that determines whether parser will allow use of Java/C++ style comments (both ‘/’+’*’ and ‘//’ varieties) within parsed content or not. false
allowMissingValues Feature allows the support for “missing” values in a JSON array: missing value meaning sequence of two commas, without value in-between but only optional white space. false
allowNonNumericNumbers Feature that allows parser to recognize set of “Not-a-Number” (NaN) tokens as legal floating number values (similar to how many other data formats and programming language source code allows it). false
allowNumericLeadingZeros Feature that determines whether parser will allow JSON integral numbers to start with additional (ignorable) zeroes (like: 000001). false
allowSingleQuotes Feature that determines whether parser will allow use of single quotes (apostrophe, character ‘'’) for quoting Strings (names and String values). If so, this is in addition to other acceptable markers but not by JSON specification). false
allowTrailingComma Feature that determines whether we will allow for a single trailing comma following the final value (in an Array) or member (in an Object). These commas will simply be ignored. false
allowUnquotedControlChars Feature that determines whether parser will allow JSON Strings to contain unquoted control characters (ASCII characters with value less than 32, including tab and line feed characters) or not. If feature is set false, an exception is thrown if such a character is encountered. false
allowUnquotedFieldNames Feature that determines whether parser will allow use of unquoted field names (which is allowed by Javascript, but not by JSON specification). false
allowYamlComments Feature that determines whether parser will allow use of YAML comments, ones starting with ‘#’ and continuing until the end of the line. This commenting style is common with scripting languages as well. false

XMLFragmentParser

xml.svg XMLFragmentParser  

A parser to convert multiple XML fragments into an XML document.

Element properties:

Name Description Default Value
namePattern A name pattern to load a text converter dynamically. -
suppressDocumentNotFoundWarnings If the text converter cannot be found to match the name pattern suppress warnings. false
textConverter The XML fragment wrapper that should be used to wrap the input XML. -

XMLParser

xml.svg XMLParser  

TODO - Add description

Filter

Filter elements work with XML events that have been generated by a parser. They can consume the events without modifying them, e.g. RecordCountFilter or modify them in some way, e.g. XSLTFilter. Multiple filters can be used one after another with each using the output from the last as its input.

ElasticIndexingFilter

ElasticIndex.svg ElasticIndexingFilter  

TODO - Add description

Element properties:

Name Description Default Value
batchSize Maximum number of documents to index in each bulk request 10000
cluster Target Elasticsearch cluster -
indexBaseName Name of the Elasticsearch index -
indexNameDateFieldName Name of the field containing the DateTime value to use when determining the index date suffix @timestamp
indexNameDateFormat Format of the date to append to the index name (example: -yyyy). If unspecified, no date is appended. -
indexNameDateMaxFutureOffset Do not append a time suffix to the index name for events occurring after the current time plus the specified offset P1D
indexNameDateMin Do not append a time suffix to the index name for events occurring before this date. Date is assumed to be in UTC and of the format specified in indexNameDateMinFormat -
indexNameDateMinFormat Date format of the supplied indexNameDateMin property yyyy
ingestPipeline Name of the Elasticsearch ingest pipeline to execute when indexing -
purgeOnReprocess When reprocessing a stream, first delete any documents from the index matching the stream ID true
refreshAfterEachBatch Refresh the index after each batch is processed, making the indexed documents visible to searches false

HttpPostFilter

stream.svg HTTPPostFilter  

TODO - Add description

Element properties:

Name Description Default Value
receivingApiUrl The URL of the receiving API. -

IdEnrichmentFilter

id.svg IdEnrichmentFilter  

TODO - Add description

IndexingFilter

index.svg IndexingFilter  

A filter to send source data to an index.

Element properties:

Name Description Default Value
index The index to send records to. -

RecordCountFilter

recordCount.svg RecordCountFilter  

TODO - Add description

Element properties:

Name Description Default Value
countRead Is this filter counting records read or records written? true

RecordOutputFilter

recordOutput.svg RecordOutputFilter  

TODO - Add description

ReferenceDataFilter

referenceData.svg ReferenceDataFilter  

Takes XML input (conforming to the reference-data:2 schema) and loads the data into the Reference Data Store. Reference data values can be either simple strings or XML fragments.

Element properties:

Name Description Default Value
overrideExistingValues Allow duplicate keys to override existing values? true
warnOnDuplicateKeys Warn if there are duplicate keys found in the reference data? false

SafeXMLFilter

recordOutput.svg SafeXMLFilter  

TODO - Add description

SchemaFilter

xsd.svg SchemaFilter  

Checks the format of the source data against one of a number of XML schemas. This ensures that if non-compliant data is generated, it will be flagged as in error and will not be passed to any subsequent processing elements.

Element properties:

Name Description Default Value
namespaceURI Limits the schemas that can be used to validate data to those with a matching namespace URI. -
schemaGroup Limits the schemas that can be used to validate data to those with a matching schema group name. -
schemaLanguage The schema language that the schema is written in. http://www.w3.org/2001/XMLSchema
schemaValidation Should schema validation be performed? true
systemId Limits the schemas that can be used to validate data to those with a matching system id. -

SearchResultOutputFilter

search.svg SearchResultOutputFilter  

TODO - Add description

SolrIndexingFilter

solr.svg SolrIndexingFilter  

Delivers source data to the specified index in an external Solr instance/cluster.

Element properties:

Name Description Default Value
batchSize How many documents to send to the index in a single post. 1000
commitWithinMs Commit indexed documents within the specified number of milliseconds. -1
index The index to send records to. -
softCommit Perform a soft commit after every batch so that docs are available for searching immediately (if using NRT replicas). true

SplitFilter

split.svg SplitFilter  

Splits multi-record source data into smaller groups of records prior to delivery to an XSLT. This allows the XSLT to process data more efficiently than loading a potentially huge input stream into memory.

Element properties:

Name Description Default Value
splitCount The number of elements at the split depth to count before the XML is split. 10000
splitDepth The depth of XML elements to split at. 1
storeLocations Should this split filter store processing locations. true

StatisticsFilter

statistics.svg StatisticsFilter  

An element to allow the source data (conforming to the statistics XML Schema) to be sent to the MySQL based statistics data store.

Element properties:

Name Description Default Value
statisticsDataSource The statistics data source to record statistics against. -

StroomStatsFilter

StroomStatsStore.svg StroomStatsFilter  

An element to allow the source data (conforming to the statistics XML Schema) to be sent to an external stroom-stats service.

Element properties:

Name Description Default Value
flushOnSend At the end of the stream, wait for acknowledgement from the Kafka broker for all the messages sent. This ensures errors are caught in the pipeline process. true
kafkaConfig The Kafka config to use. -
statisticsDataSource The stroom-stats data source to record statistics against. -

XPathExtractionOutputFilter

xmlSearch.svg XPathExtractionOutputFilter  

TODO - Add description

Element properties:

Name Description Default Value
multipleValueDelimiter The string to delimit multiple simple values. ,

XSLTFilter

xslt.svg XSLTFilter  

An element used to transform XML data from one form to another using XSLT. The specified XSLT can be used to transform the input XML into XML conforming to another schema or into other forms such as JSON, plain text, etc.

Element properties:

Name Description Default Value
pipelineReference A list of places to load reference data from if required. -
suppressXSLTNotFoundWarnings If XSLT cannot be found to match the name pattern suppress warnings. false
usePool Advanced: Choose whether or not you want to use cached XSLT templates to improve performance. true
xslt The XSLT to use. -
xsltNamePattern A name pattern to load XSLT dynamically. -

Writer

Writers consume XML events (from Parsers and Filters) and convert them into a stream of bytes using the character encoding configured on the Writer (if applicable). The output data can then be fed to a Destination.

JSONWriter

json.svg JSONWriter  

Writer to convert XML data conforming to the http://www.w3.org/2013/XSL/json XML Schema into JSON format.

Element properties:

Name Description Default Value
encoding The output character encoding to use. UTF-8
indentOutput Should output JSON be indented and include new lines (pretty printed)? false

TextWriter

text.svg TextWriter  

Writer to convert XML character data events into plain text output.

Element properties:

Name Description Default Value
encoding The output character encoding to use. UTF-8
footer Footer text that can be added to the output at the end. -
header Header text that can be added to the output at the start. -

XMLWriter

xml.svg XMLWriter  

Writer to convert XML events data into XML output in the specified character encoding.

Element properties:

Name Description Default Value
encoding The output character encoding to use. UTF-8
indentOutput Should output XML be indented and include new lines (pretty printed)? false
suppressXSLTNotFoundWarnings If XSLT cannot be found to match the name pattern suppress warnings. false
xslt A previously saved XSLT, used to modify the output via xsl:output attributes. -
xsltNamePattern A name pattern for dynamic loading of an XSLT, that will modfy the output via xsl:output attributes. -

Destination

Destination elements consume a stream of bytes from a Writer and persist then to a destination. This could be a file on a file system or to Stroom’s stream store.

AnnotationWriter

text.svg AnnotationWriter  

TODO - Add description

FileAppender

file.svg FileAppender  

A destination used to write an output stream to a file on the file system. If multiple paths are specified in the ‘outputPaths’ property it will pick one at random to write to.

Element properties:

Name Description Default Value
filePermissions Set file system permissions of finished files (example: ‘rwxr–r–’) -
outputPaths One or more destination paths for output files separated with commas. Replacement variables can be used in path strings such as ${feed}. -
rollSize When the current output file exceeds this size it will be closed and a new one created. -
splitAggregatedStreams Choose if you want to split aggregated streams into separate output files. false
splitRecords Choose if you want to split individual records into separate output files. false
useCompression Apply GZIP compression to output files false

HDFSFileAppender

hadoop-elephant-logo.svg HDFSFileAppender  

A destination used to write an output stream to a file on a Hadoop Distributed File System. If multiple paths are specified in the ‘outputPaths’ property it will pick one at random.

Element properties:

Name Description Default Value
fileSystemUri URI for the Hadoop Distributed File System (HDFS) to connect to, e.g. hdfs://mynamenode.mydomain.com:8020 -
outputPaths One or more destination paths for output files separated with commas. Replacement variables can be used in path strings such as ${feed}. -
rollSize When the current output file exceeds this size it will be closed and a new one created. -
runAsUser The user to connect to HDFS as -
splitAggregatedStreams Choose if you want to split aggregated streams into separate output files. false
splitRecords Choose if you want to split individual records into separate output files. false

HTTPAppender

stream.svg HTTPAppender  

A destination used to write an output stream to a remote HTTP(s) server.

Element properties:

Name Description Default Value
connectionTimeout How long to wait before we abort sending data due to connection timeout -
contentType The content type application/json
forwardChunkSize Should data be sent in chunks and if so how big should the chunks be -
forwardUrl The URL to send data to -
hostnameVerificationEnabled Verify host names true
httpHeadersIncludeStreamMetaData Provide stream metadata as HTTP headers true
httpHeadersUserDefinedHeader1 Additional HTTP Header 1, format is ‘HeaderName: HeaderValue’ -
httpHeadersUserDefinedHeader2 Additional HTTP Header 2, format is ‘HeaderName: HeaderValue’ -
httpHeadersUserDefinedHeader3 Additional HTTP Header 3, format is ‘HeaderName: HeaderValue’ -
keyStorePassword The key store password -
keyStorePath The key store file path on the server -
keyStoreType The key store type JKS
logMetaKeys Which meta data values will be logged in the send log guid,feed,system,environment,remotehost,remoteaddress
readTimeout How long to wait for data to be available before closing the connection -
requestMethod The request method, e.g. POST POST
rollSize When the current output exceeds this size it will be closed and a new one created. -
splitAggregatedStreams Choose if you want to split aggregated streams into separate output. false
splitRecords Choose if you want to split individual records into separate output. false
sslProtocol The SSL protocol to use TLSv1.2
trustStorePassword The trust store password -
trustStorePath The trust store file path on the server -
trustStoreType The trust store type JKS
useCompression Should data be compressed when sending true
useJvmSslConfig Use JVM SSL config. Set this to true if the Stroom node has been configured with key/trust stores using java system properties like ‘javax.net.ssl.keyStore’.Set this to false if you are explicitly setting key/trust store properties on this HttpAppender. true

RollingFileAppender

files.svg RollingFileAppender  

A destination used to write an output stream to a file on the file system. If multiple paths are specified in the ‘outputPaths’ property it will pick one at random to write to. This is distinct from the FileAppender in that when the rollSize is reached it will move the current file to the path specified in rolledFileName and resume writing to the original path. This allows other processes to follow the changes to a single file path, e.g. when using tail.

Element properties:

Name Description Default Value
fileName Choose the name of the file to write. -
filePermissions Set file system permissions of finished files (example: ‘rwxr–r–’) -
frequency Choose how frequently files are rolled. 1h
outputPaths One or more destination paths for output files separated with commas. Replacement variables can be used in path strings such as ${feed}. -
rollSize When the current output file exceeds this size it will be closed and a new one created, e.g. 10M, 1G. 100M
rolledFileName Choose the name that files will be renamed to when they are rolled. -
schedule Provide a cron expression to determine when files are rolled. -
useCompression Apply GZIP compression to output files false

RollingStreamAppender

stream.svg RollingStreamAppender  

A destination used to write one or more output streams to a new stream which is then rolled when it reaches a certain size or age. A new stream will be created after the size or age criteria has been met.

Element properties:

Name Description Default Value
feed The feed that output stream should be written to. If not specified the feed the input stream belongs to will be used. -
frequency Choose how frequently streams are rolled. 1h
rollSize Choose the maximum size that a stream can be before it is rolled. 100M
schedule Provide a cron expression to determine when streams are rolled. -
segmentOutput Should the output stream be marked with indexed segments to allow fast access to individual records? true
streamType The stream type that the output stream should be written as. This must be specified. -

StandardKafkaProducer

kafka.svg StandardKafkaProducer  

TODO - Add description

Element properties:

Name Description Default Value
flushOnSend At the end of the stream, wait for acknowledgement from the Kafka broker for all the messages sent. This ensures errors are caught in the pipeline process. true
kafkaConfig Kafka configuration details relating to where and how to send Kafka messages. -

StreamAppender

stream.svg StreamAppender  

TODO - Add description

Element properties:

Name Description Default Value
feed The feed that output stream should be written to. If not specified the feed the input stream belongs to will be used. -
rollSize When the current output stream exceeds this size it will be closed and a new one created. -
segmentOutput Should the output stream be marked with indexed segments to allow fast access to individual records? true
splitAggregatedStreams Choose if you want to split aggregated streams into separate output streams. false
splitRecords Choose if you want to split individual records into separate output streams. false
streamType The stream type that the output stream should be written as. This must be specified. -

StroomStatsAppender

StroomStatsStore.svg StroomStatsAppender  

TODO - Add description

Element properties:

Name Description Default Value
flushOnSend At the end of the stream, wait for acknowledgement from the Kafka broker for all the messages sent. This ensures errors are caught in the pipeline process. true
kafkaConfig The Kafka config to use. -
maxRecordCount Choose the maximum number of records or events that a message will contain 1
statisticsDataSource The stroom-stats data source to record statistics against. -

5 - Reference Data

Performing temporal reference data lookups to decorate event data.

In Stroom reference data is primarily used to decorate events using stroom:lookup() calls in XSLTs. For example you may have reference data feed that associates the FQDN of a device to the physical location. You can then perform a stroom:lookup() in the XSLT to decorate an event with the physical location of a device by looking up the FQDN found in the event.

Reference data is time sensitive and each stream of reference data has an Effective Date set against it. This allows reference data lookups to be performed using the date of the event to ensure the reference data that was actually effective at the time of the event is used.

Using reference data involves the following steps/processes:

  • Ingesting the raw reference data into Stroom.
  • Creating (and processing) a pipeline to transform the raw reference into reference-data:2 format XML.
  • Creating a reference loader pipeline with a Reference Data Filter element to load cooked reference data into the reference data store.
  • Adding reference pipeline/feeds to an XSLT Filter in your event pipeline.
  • Adding the lookup call to the XSLT.
  • Processing the raw events through the event pipeline.

The process of creating a reference data pipeline is described in the HOWTO linked at the top of this document.

Reference Data Structure

A reference data entry essentially consists of the following:

  • Effective time - The data/time that the entry was effective from, i.e the time the raw reference data was received.
  • Map name - A unique name for the key/value map that the entry will be stored in. The name only needs to be unique within all map names that may be loaded within an XSLT Filter. In practice it makes sense to keep map names globally unique.
  • Key - The text that will be used to lookup the value in the reference data map. Mutually exclusive with Range.
  • Range - The inclusive range of integer keys that the entry applies to. Mutually exclusive with Key.
  • Value - The value can either be simple text, e.g. an IP address, or an XML fragment that can be inserted into another XML document. XML values must be correctly namespaced.

The following is an example of some reference data that has been converted from its raw form into reference-data:2 XML. In this example the reference data contains three entries that each belong to a different map. Two of the entries are simple text values and the last has an XML value.

<?xml version="1.1" encoding="UTF-8"?>
<referenceData 
    xmlns="reference-data:2" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:stroom="stroom" 
    xmlns:evt="event-logging:3" 
    xsi:schemaLocation="reference-data:2 file://reference-data-v2.0.xsd" 
    version="2.0.1">

  <!-- A simple string value -->
  <reference>
    <map>FQDN_TO_IP</map>
    <key>stroomnode00.strmdev00.org</key>
    <value>
      <IPAddress>192.168.2.245</IPAddress>
    </value>
  </reference>

  <!-- A simple string value -->
  <reference>
    <map>IP_TO_FQDN</map>
    <key>192.168.2.245</key>
    <value>
      <HostName>stroomnode00.strmdev00.org</HostName>
    </value>
  </reference>

  <!-- A key range -->
  <reference>
    <map>USER_ID_TO_COUNTRY_CODE</map>
    <range>
      <from>1</from>
      <to>1000</to>
    </range>
    <value>GBR</value>
  </reference>

  <!-- An XML fragment value -->
  <reference>
    <map>FQDN_TO_LOC</map>
    <key>stroomnode00.strmdev00.org</key>
    <value>
      <evt:Location>
        <evt:Country>GBR</evt:Country>
        <evt:Site>Bristol-S00</evt:Site>
        <evt:Building>GZero</evt:Building>
        <evt:Room>R00</evt:Room>
        <evt:TimeZone>+00:00/+01:00</evt:TimeZone>
      </evt:Location>
    </value>
  </reference>
</referenceData>

Reference Data Namespaces

When XML reference data values are created, as in the example XML above, the XML values must be qualified with a namespace to distinguish them from the reference-data:2 XML that surrounds them. In the above example the XML fragment will become as follows when injected into an event:

      <evt:Location xmlns:evt="event-logging:3" >
        <evt:Country>GBR</evt:Country>
        <evt:Site>Bristol-S00</evt:Site>
        <evt:Building>GZero</evt:Building>
        <evt:Room>R00</evt:Room>
        <evt:TimeZone>+00:00/+01:00</evt:TimeZone>
      </evt:Location>

Even if evt is already declared in the XML being injected into it, if it has been declared for the reference fragment then it will be explicitly declared in the destination. While duplicate namespacing may appear odd it is valid XML.

The namespacing can also be achieved like this:

<?xml version="1.1" encoding="UTF-8"?>
<referenceData 
    xmlns="reference-data:2" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:stroom="stroom" 
    xsi:schemaLocation="reference-data:2 file://reference-data-v2.0.xsd" 
    version="2.0.1">

  <!-- An XML value -->
  <reference>
    <map>FQDN_TO_LOC</map>
    <key>stroomnode00.strmdev00.org</key>
    <value>
      <Location xmlns="event-logging:3">
        <Country>GBR</Country>
        <Site>Bristol-S00</Site>
        <Building>GZero</Building>
        <Room>R00</Room>
        <TimeZone>+00:00/+01:00</TimeZone>
      </Location>
    </value>
  </reference>
</referenceData>

This reference data will be injected into event XML exactly as it, i.e.:

      <Location xmlns="event-logging:3">
        <Country>GBR</Country>
        <Site>Bristol-S00</Site>
        <Building>GZero</Building>
        <Room>R00</Room>
        <TimeZone>+00:00/+01:00</TimeZone>
      </Location>

Reference Data Storage

Reference data is stored in two different places on a Stroom node. All reference data is only visible to the node where it is located. Each node that is performing reference data lookups will need to load and store its own reference data. While this will result in duplicate data being held by nodes it makes the storage of reference data and its subsequent lookup very performant.

On-Heap Store

The On-Heap store is the reference data store that is held in memory in the Java Heap. This store is volatile and will be lost on shut down of the node. The On-Heap store is only used for storage of context data.

Off-Heap Store

The Off-Heap store is the reference data store that is held in memory outside of the Java Heap and is persisted to to local disk. As the store is also persisted to local disk it means the reference data will survive the shutdown of the stroom instance. Storing the data off-heap means Stroom can run with a much smaller Java Heap size.

The Off-Heap store is based on the Lightning Memory-Mapped Database (LMDB). LMDB makes use of the Linux page cache to ensure that hot portions of the reference data are held in the page cache (making use of all available free memory). Infrequently used portions of the reference data will be evicted from the page cache by the Operating System. Given that LMDB utilises the page cache for holding reference data in memory the more free memory the host has the better as there will be less shifting of pages in/out of the OS page cache. When storing large amounts of data you may experience the OS reporting very little free memory as a large amount will be in use by the page cache. This is not an issue as the OS will evict pages when memory is needed for other applications, e.g. the Java Heap.

Local Disk

The Off-Heap store is intended to be located on local disk on the Stroom node. The location of the store is set using the property stroom.pipeline.referenceData.localDir. Using LMDB on remote storage is NOT advised, see http://www.lmdb.tech/doc. Using the fastest storage (i.g. fast SSDs) is advised to reduce load times and lookups of data that is not in memory.

Transactions

LMDB is a transactional database with ACID semantics. All interaction with LMDB is done within a read or write transaction. There can only be one write transaction at a time so if there are a number of concurrent reference data loads then they will have to wait in line.

Read transactions, i.e. lookups, are not blocked by each other but may be blocked by a write transaction depending on the value of the system property stroom.pipeline.referenceData.lmdb.readerBlockedByWriter. LMDB can operate such that readers are not blocked by writers but if there is an open read transaction while a write transaction is writing data to the store then it is unable to make use of free space (from previous deletes, see Store Size & Compaction) so will result in the store increasing in size. If read transactions are likely while writes are taking place then this can lead to excessive growth of the store. Setting stroom.pipeline.referenceData.lmdb.readerBlockedByWriter to true will block all reads while a load is happening so any free space can be re-used, at the cost of making all lookups wait for the load to complete. Use of this setting will depend on how likely it is that loads will clash with lookups and the store size should be monitored.

Read-Ahead Mode

When data is read from the store, if the data is not already in the page cache then it will be read from disk and added to the page cache by the OS. Read-ahead is the process of speculatively reading ahead to load more pages into the page cache than were requested. This is on the basis that future requests for data may need the pages speculatively read into memory as it is more efficient to read multiple pages at once. If the reference data store is very large or is larger than the available memory then it is recommended to turn read-ahead off as the result will be to evict hot reference data from the page cache to make room for speculative pages that may not be needed. It can be tuned off with the system property stroom.pipeline.referenceData.readAheadEnabled.

Key Size

When reference data is created care must be taken to ensure that the Key used for each entry is less than 507 bytes. For simple ASCII characters then this means less than 507 characters. If non-ASCII characters are in the key then these will take up more than one byte per character so the length of the key in characters will be less. This is a limitation inherent to LMDB.

Commit intervals

The property stroom.pipeline.referenceData.maxPutsBeforeCommit controls the number of entries that are put into the store between each commit. As there can be only one transaction writing to the store at a time, committing periodically allows other process to jump in and make writes. There is a trade off though as reducing the number of entries put between each commit can seriously affect performance. For the fastest single process performance a value of 0 should be used which means it will not commit mid-load. This however means all other processes wanting to write to the store will need to wait. Low values (e.g. in the hundreds) mean very frequent commits so will hamper performance.

Cloning The Off Heap Store

If you are provisioning a new stroom node it is possible to copy the off heap store from another node. Stroom should not be running on the node being copied from. Simply copy the contents of stroom.pipeline.referenceData.localDir into the same configured location on the other node. The new node will use the copied store and have access to its reference data.

Store Size & Compaction

Due to the way LMDB works the store can only grow in size, it will never shrink, even if reference data is deleted. Deleted data frees up space for new writes to the store so will be reused but will never be freed back to the operating system. If there is a regular process of purging old data and adding new reference data then this should not be an issue as the new reference data will use the space made available by the purged data.

If store size becomes an issue then it is possible to compact the store. lmdb-utils is package that is available on some package managers and this has an mdb_copy command that can be used with the -c switch to copy the LMDB environment to a new one, compacting it in the process. This should be done when Stroom is down to avoid writes happening to the store while the copy is happening.

The following is an example of how to compact the store assuming Stroom has been shut down first.

# Navigate to the 'stroom.pipeline.referenceData.localDir' directory
cd /some/path/to/reference_data
# Verify contents
ls
(out) data.mdb  lock.mdb
# Create a directory to write the compacted file to
mkdir compacted
# Run the compaction, writing the new data.mdb file to the new sub-dir
mdb_copy -c ./ ./compacted
# Delete the existing store
rm data.mdb lock.mdb
# Copy the compacted store back in (note a lock file gets created as needed)
mv compacted/data.mdb ./
# Remove the created directory
rmdir compacted

Now you can re-start Stroom and it will use the new compacted store, creating a lock file for it.

The compaction process is fast. A test compaction of a 4Gb store, compacted down to 1.6Gb took about 7s on non-flash HDD storage.

Alternatively, given that the store is essentially a cache and all data can be re-loaded another option is to delete the contents of stroom.pipeline.referenceData.localDir when Stroom is not running. On boot Stroom will create a brand new empty store and reference data will be re-loaded as required. This approach will result in all data having to be re-loaded so will slow lookups down until it has been loaded.

The Loading Process

Reference data is loaded into the store on demand during the processing of a stroom:lookup() method call. Reference data will only be loaded if it does not already exist in the store, however it is always loaded as a complete stream, rather than entry by entry.

The test for existence in the store is based on the following criteria:

  • The UUID of the reference loader pipeline.
  • The version of the reference loader pipeline.
  • The Stream ID for the stream of reference data that has been deemed effective for the lookup.
  • The Stream Number (in the case of multi part streams).

If a reference stream has already been loaded matching the above criteria then no additional load is required.

IMPORTANT: It should be noted that as the version of the reference data pipeline forms part of the criteria, if the reference loader pipeline is changed, for whatever reason, then this will invalidate ALL existing reference data associated with that reference loader pipeline.

Typically the reference loader pipeline is very static so this should not be an issue.

Standard practice is to convert raw reference data into reference:2 XML on receipt using a pipeline separate to the reference loader. The reference loader is then only concerned with reading cooked reference:2 into the Reference Data Filter.

In instances where reference data streams are infrequently used it may be preferable to not convert the raw reference on receipt but instead to do it in the reference loader pipeline.

Duplicate Keys

The Reference Data Filter pipeline element has a property overrideExistingValues which if set to true means if an entry is found in an effective stream with the same key as an entry already loaded then it will overwrite the existing one. Entries are loaded in the order they are found in the reference:2 XML document. If set to false then the existing entry will be kept. If warnOnDuplicateKeys is set to true then a warning will be logged for any duplicate keys, whether an overwrite happens or not.

Value De-Duplication

Only unique values are held in the store to reduce the storage footprint. This is useful given that typically, reference data updates may be received daily and each one is a full snapshot of the whole reference data. As a result this can mean many copies of the same value being loaded into the store. The store will only hold the first instance of duplicate values.

Querying the Reference Data Store

The reference data store can be queried within a Dashboard in Stroom by selecting Reference Data Store in the data source selection pop-up. Querying the store is currently an experimental feature and is mostly intended for use in fault finding. Given the localised nature of the reference data store the dashboard can currently only query the store on the node that the user interface is being served from. In a multi-node environment where some nodes are UI only and most are processing only, the UI nodes will have no reference data in their store.

Purging Old Reference Data

Reference data loading and purging is done at the level of a reference stream. Whenever a reference lookup is performed the last accessed time of the reference stream in the store is checked. If it is older than one hour then it will be updated to the current time. This last access time is used to determine reference streams that are no longer in active use and thus can be purged.

The Stroom job Ref Data Off-heap Store Purge is used to perform the purge operation on the Off-Heap reference data store. No purge is required for the On-Heap store as that only holds transient data. When the purge job is run it checks the time since each reference stream was accessed against the purge cut-off age. The purge age is configured via the property stroom.pipeline.referenceData.purgeAge. It is advised to schedule this job for quiet times when it is unlikely to conflict with reference loading operations as they will fight for access to the single write transaction.

Lookups

Lookups are performed in XSLT Filters using the XSLT functions. In order to perform a lookup one or more reference feeds must be specified on the XSLT Filter pipeline element. Each reference feed is specified along with a reference loader pipeline that will ingest the specified feed (optional convert it into reference:2 XML if it is not already) and pass it into a Reference Data Filter pipeline element.

Reference Feeds & Loaders

In the XSLT Filter pipeline element multiple combinations of feed and reference loader pipeline can be specified. There must be at least one in order to perform lookups. If there are multiple then when a lookup is called for a given time, the effective stream for each feed/loader combination is determined. The effective stream for each feed/loader combination will be loaded into the store, unless it is already present.

When the actual lookup is performed Stroom will try to find the key in each of the effective streams that have been loaded and that contain the map in the lookup call. If the lookup is unsuccessful in the effective stream for the first feed/loader combination then it will try the next, and so on until it has tried all of them. For this reason if you have multiple feed/loader combinations then order is important. It is possible for multiple effective streams to contain the same map/key so a feed/loader combination higher up the list will trump one lower down with the same map/key. Also if you have some lookups that may not return a value and others that should always return a value then the feed/loader for the latter should be higher up the list so it is searched first.

Effective Streams

Reference data lookups have the concept of Effective Streams. An effective stream is the most recent stream for a given Feed that has an effective date that is less than or equal to the date used for the lookup. When performing a lookup, Stroom will search the stream store to find all the effective streams in a time bucket that surrounds the lookup time. These sets of effective streams are cached so if a new reference stream is created it will not be used until the cached set has expired. To rectify this you can clear the cache Reference Data - Effective Stream Cache on the Caches screen accessed from:

Monitoring
monitoring.svg
Caches

Standard Key/Value Lookups

Standard key/value lookups consist of a simple string key and a value that is either a simple string or an XML fragment. Standard lookups are performed using the various forms of the stroom:lookup() XSLT function.

Range Lookups

Range lookups consist of a key that is an integer and a value that is either a simple string or an XML fragment. For more detail on range lookups see the XSLT function stroom:lookup().

Nested Map Lookups

Nested map lookups involve chaining a number of lookups with the value of each map being used as the key for the next. For more detail on nested lookups see the XSLT function stroom:lookup().

Bitmap Lookups

A bitmap lookup is a special kind of lookup that actually performs a lookup for each enabled bit position of the passed bitmap value. For more detail on bitmap lookups see the XSLT function stroom:bitmap-lookup().

Values can either be a simple string or an XML fragment.

Context data lookups

Some event streams have a Context stream associated with them. Context streams allow the system sending the events to Stroom to supply an additional stream of data that provides context to the raw event stream. This can be useful when the system sending the events has no control over the event content but needs to supply additional information. The context stream can be used in lookups as a reference source to decorate events on receipt. Context reference data is specific to a single event stream so is transient in nature, therefore the On Heap Store is used to hold it for the duration of the event stream processing only.

Typically the reference loader for a context stream will include a translation step to convert the raw context data into reference:2 XML.

Reference Data API

See Reference Data API.