Pipeline Elements
Reader
Reader elements decode the data in raw byte form using the Feed’s configured character encoding. Some of them will also transform the data at the character level before the data are parsed into a structured form.
BOMRemovalFilterInput
Removes the Byte Order Mark (if present) from the stream.
BadTextXMLFilterReader
Escapes the content of a configured list of named XML elements that are know to potentially contain un-escaped XML reserved characters.
For example the element <Expression>$time < now()</Expression>
would be transformed to <Expression>$time < now()</Expression>
if property leafList
is set to Expression
.
Element properties:
Name | Description | Default Value | Value Type |
---|---|---|---|
tags | A comma separated list of XML element names (case sensitive) between which non-escaped XML characters will be escaped, e.g. ‘>’ => ‘>’. | - | String |
FindReplaceFilter
Replaces strings or regexes with new strings.
Element properties:
Name | Description | Default Value | Value Type |
---|---|---|---|
bufferSize | The number of characters to buffer when matching the regex. | 1000 | Integer |
dotAll | Let ‘.’ match all characters in a regex. | false | Boolean |
escapeFind | Whether or not to escape find pattern or text. | true | Boolean |
escapeReplacement | Whether or not to escape replacement text. | true | Boolean |
find | The text or regex pattern to find and replace. | - | String |
maxReplacements | The maximum number of times to try and replace text. There is no limit by default. | - | String |
regex | Whether the pattern should be treated as a literal or a regex. | false | Boolean |
replacement | The replacement text. | - | String |
showReplacementCount | Show total replacement count | true | Boolean |
InvalidCharFilterReader
Removes any characters that are not in the standard XML character set. The version of XML (e.g. 1.0 or 1.1) can be set using the ‘xmlVersion’ property.
Element properties:
Name | Description | Default Value | Value Type |
---|---|---|---|
warnOnRemoval | Log a warning if any characters have been removed from the input stream. | true | Boolean |
xmlVersion | XML version, e.g. ‘1.0’ or ‘1.1’ | 1.1 | String |
InvalidXMLCharFilterReader
Replaces any characters that are not in the standard XML character set with a ‘�’. The version of XML (e.g. 1.0 or 1.1) can be set using the ‘xmlVersion’ property.
Element properties:
Name | Description | Default Value | Value Type |
---|---|---|---|
warnOnReplacement | Log a warning if any characters have been replaced in the input stream. | true | Boolean |
xmlVersion | XML version, e.g. ‘1.0’ or ‘1.1’ | 1.1 | String |
Reader
A basic reader that simply decodes the raw bytes using the Feed’s configured character encoding. It does not transform the data in any other way.
Parser
Parser elements parse raw text data that has an expected structure (e.g. XML, JSON, CSV) into XML events (elements, attributes, text, etc) that can be further validated or transformed using XSLT. The choice of Parser will be dictated by the structure of the data. If no Reader is used before the Parser, the Parser will also do the job of a simple Reader and decode the raw bytes using the Feed’s configured character encoding.
CombinedParser
The original general-purpose reader/parser that covers all source data types but provides less flexibility than the source format-specific parsers such as dsParser.
It effectively combines a BOMRemovalFilterInput, an InvalidCharFilterReader and Parser (based on the type
property.
Warning
It is strongly recommended to instead use a combination of Readers and one of the type specific Parsers. This will make the intent of the pipeline much clearer and allow for much greater control.Element properties:
Name | Description | Default Value | Value Type |
---|---|---|---|
fixInvalidChars | Fix invalid XML characters from the input stream. | false | Boolean |
namePattern | A name pattern to load a text converter dynamically. | - | String |
suppressDocumentNotFoundWarnings | If the text converter cannot be found to match the name pattern suppress warnings. | false | Boolean |
textConverter | The text converter configuration that should be used to parse the input data. | - | Document |
type | The parser type, e.g. ‘JSON’, ‘XML’, ‘Data Splitter’. | - | String |
DSParser
A parser for handling structured plain text data (e.g. CSV or fixed width fields) using the Data Splitter domain specific language. For more details see Data Splitter.
Element properties:
Name | Description | Default Value | Value Type |
---|---|---|---|
namePattern | A name pattern to load a data splitter dynamically. | - | String |
suppressDocumentNotFoundWarnings | If the data splitter cannot be found to match the name pattern suppress warnings. | false | Boolean |
textConverter | The data splitter configuration that should be used to parse the input data. | - | Document |
JSONParser
A built-in parser for parsing JSON source data (in JSON fragment format) into an XML representation of the JSON. The Resulting XML will conform to the http://www.w3.org/2013/XSL/json namespace.
Element properties:
Name | Description | Default Value | Value Type |
---|---|---|---|
addRootObject | Add a root map element. | true | Boolean |
allowBackslashEscapingAnyCharacter | Feature that can be enabled to accept quoting of all character using backslash quoting mechanism: if not enabled, only characters that are explicitly listed by JSON specification can be thus escaped (see JSON spec for small list of these characters) | false | Boolean |
allowComments | Feature that determines whether parser will allow use of Java/C++ style comments (both ‘/’+’*’ and ‘//’ varieties) within parsed content or not. | false | Boolean |
allowMissingValues | Feature allows the support for “missing” values in a JSON array: missing value meaning sequence of two commas, without value in-between but only optional white space. | false | Boolean |
allowNonNumericNumbers | Feature that allows parser to recognize set of “Not-a-Number” (NaN) tokens as legal floating number values (similar to how many other data formats and programming language source code allows it). | false | Boolean |
allowNumericLeadingZeros | Feature that determines whether parser will allow JSON integral numbers to start with additional (ignorable) zeroes (like: 000001). | false | Boolean |
allowSingleQuotes | Feature that determines whether parser will allow use of single quotes (apostrophe, character ‘'’) for quoting Strings (names and String values). If so, this is in addition to other acceptable markers but not by JSON specification). | false | Boolean |
allowTrailingComma | Feature that determines whether we will allow for a single trailing comma following the final value (in an Array) or member (in an Object). These commas will simply be ignored. | false | Boolean |
allowUnquotedControlChars | Feature that determines whether parser will allow JSON Strings to contain unquoted control characters (ASCII characters with value less than 32, including tab and line feed characters) or not. If feature is set false, an exception is thrown if such a character is encountered. | false | Boolean |
allowUnquotedFieldNames | Feature that determines whether parser will allow use of unquoted field names (which is allowed by Javascript, but not by JSON specification). | false | Boolean |
allowYamlComments | Feature that determines whether parser will allow use of YAML comments, ones starting with ‘#’ and continuing until the end of the line. This commenting style is common with scripting languages as well. | false | Boolean |
XMLFragmentParser
A parser to convert multiple XML fragments into an XML document. For example the data may contain:
<Event>...</Event>
<Event>...</Event>
i.e. with no root element, so not valid XML.
The XMLFragmentParser will wrap the fragments with a root element as defined in the TextConverter document configured with the textConverterRef
property.
Element properties:
Name | Description | Default Value | Value Type |
---|---|---|---|
namePattern | A name pattern to load a text converter dynamically. | - | String |
suppressDocumentNotFoundWarnings | If the text converter cannot be found to match the name pattern suppress warnings. | false | Boolean |
textConverter | The XML fragment wrapper that should be used to wrap the input XML. | - | Document |
XMLParser
A parser to parse data that is expected to be XML into a series of XML events that can be consumed by a Filter element.
Filter
Filter elements work with XML events that have been generated by a parser. They can consume the events without modifying them, e.g. RecordCountFilter or modify them in some way, e.g. XSLTFilter. Multiple filters can be used one after another with each using the output from the last as its input.
DynamicIndexingFilter
A filter to send source data to an index.
Element properties:
Name | Description | Default Value | Value Type |
---|---|---|---|
index | The index to send records to. | - | Document |
DynamicSearchResultOutputFilter
DynamicSearchResultOutputFilter
Used in a search extraction pipeline for extracting field values that have not been stored in the index and where the fields are dynamic and derived from the data rather than being defined in the Index settings.
Consumes XML events in the index-documents:1
namespace to convert them into a form so that they can be used in a Dashboard/Query/Analytic.
ElasticIndexingFilter
A filter consuming XML events in the records:2
namespace to index/store the fields
and their values in an Elasticsearch Index.
Element properties:
Name | Description | Default Value | Value Type |
---|---|---|---|
batchSize | Maximum number of documents to index in each bulk request. | 10000 | Integer |
cluster | Target Elasticsearch cluster. | - | Document |
indexName | Name of the Elasticsearch index. Variables specified such as {year} are replaced with the corresponding field values contained in the document root. Field names beginning with an underscore are not written to the document and are only used in the index name pattern. |
- | String |
ingestPipeline | Name of the Elasticsearch ingest pipeline to execute when indexing. | - | String |
purgeOnReprocess | When reprocessing a stream, first delete any documents from the index matching the source stream ID. | true | Boolean |
refreshAfterEachBatch | Refresh the index after each batch is processed, making the indexed documents visible to searches. | false | Boolean |
HttpPostFilter
This element is deprecated, you should instead use the much more flexible HTTPAppender . This element will simply POST the output of the XML events to the configured URL.
Element properties:
Name | Description | Default Value | Value Type |
---|---|---|---|
receivingApiUrl | The URL of the receiving API. | - | String |
IdEnrichmentFilter
Adds the attributes ‘StreamId’ and ‘EventId’ to the ’event’ element to enrich the event with its ordinal number in the stream and the ID of the stream that it belongs to.
ID enrichment is required to be able to index events as it provides them with an ID that is unique within Stroom.
It assumes that an record/event is an XML element at the first level below the root element, i.e. for ’event-logging:3’ XML this means the <Event>
element.
IndexingFilter
A filter consuming XML events in the records:2
namespace to index/store the fields
and their values in a Lucene Index.
Element properties:
Name | Description | Default Value | Value Type |
---|---|---|---|
index | The index to send records to. | - | Document |
RecordCountFilter
Counts events/records in the stream.
An event/record is taken to be an XML element that is at the first level below the root element, i.e. for ’event-logging:3’ XML this means the <Event>
element.
Element properties:
Name | Description | Default Value | Value Type |
---|---|---|---|
countRead | Is this filter counting records read or records written? | true | Boolean |
RecordOutputFilter
Filters out records/events that have raised an Error or Fatal Error during processing.
If all records/events have raised at least an Error then no XML events will be output.
It assumes that an record/event is an XML element at the first level below the root element, i.e.
for ’event-logging:3’ XML this means the <Event>
element.""",
ReferenceDataFilter
Takes XML input (conforming to the reference-data:2 schema) and loads the data into the Reference Data Store. Reference data values can be either simple strings or XML fragments.
Element properties:
Name | Description | Default Value | Value Type |
---|---|---|---|
overrideExistingValues | Allow duplicate keys to override existing values? | true | Boolean |
warnOnDuplicateKeys | Warn if there are duplicate keys found in the reference data? | false | Boolean |
SafeXMLFilter
Restricts the characters to a very simple set consisting of [a-zA-Z0-9]
and [ .:-_/]
.
All other characters are replaced by ~NNN
, where NNN
is a three digit codepoint for the replaced character.
SchemaFilter
Checks the format of the source data against one of a number of XML schemas. This ensures that if non-compliant data is generated, it will be flagged as in error and will not be passed to any subsequent processing elements.
Element properties:
Name | Description | Default Value | Value Type |
---|---|---|---|
namespaceURI | Limits the schemas that can be used to validate data to those with a matching namespace URI. | - | String |
schemaGroup | Limits the schemas that can be used to validate data to those with a matching schema group name. | - | String |
schemaLanguage | The schema language that the schema is written in. | http://www.w3.org/2001/XMLSchema | String |
schemaValidation | Should schema validation be performed? | true | Boolean |
systemId | Limits the schemas that can be used to validate data to those with a matching system id. | - | String |
SearchResultOutputFilter
Used in a search extraction pipeline for extracting field values that have not been stored in the index and where the field definitions are defined in the Index settings.
Consumes XML events in the records:2
namespace to convert them into a form so that they can be used in a Dashboard/Query/Analytic.
SolrIndexingFilter
Delivers source data to the specified index in an external Solr instance/cluster.
Element properties:
Name | Description | Default Value | Value Type |
---|---|---|---|
batchSize | How many documents to send to the index in a single post. | 1000 | Integer |
commitWithinMs | Commit indexed documents within the specified number of milliseconds. | -1 | Integer |
index | The index to send records to. | - | Document |
softCommit | Perform a soft commit after every batch so that docs are available for searching immediately (if using NRT replicas). | true | Boolean |
SplitFilter
Splits multi-record source data into smaller groups of records prior to delivery to an XSLT. This allows the XSLT to process data more efficiently than loading a potentially huge input stream into memory.
Element properties:
Name | Description | Default Value | Value Type |
---|---|---|---|
splitCount | The number of elements at the split depth to count before the XML is split. | 10000 | Integer |
splitDepth | The depth of XML elements to split at. | 1 | Integer |
storeLocations | Should this split filter store processing locations. | true | Boolean |
StatisticsFilter
An element to allow the source data (conforming to the statistics
XML Schema) to be sent to the MySQL based statistics data store.
Element properties:
Name | Description | Default Value | Value Type |
---|---|---|---|
statisticsDataSource | The statistics data source to record statistics against. | - | Document |
StroomStatsFilter
An element to allow the source data (conforming to the statistics
XML Schema) to be sent to an external stroom-stats service.
Element properties:
Name | Description | Default Value | Value Type |
---|---|---|---|
flushOnSend | At the end of the stream, wait for acknowledgement from the Kafka broker for all the messages sent. This ensures errors are caught in the pipeline process. | true | Boolean |
kafkaConfig | The Kafka config to use. | - | Document |
statisticsDataSource | The stroom-stats data source to record statistics against. | - | Document |
XPathExtractionOutputFilter
TODO - Add description
Element properties:
Name | Description | Default Value | Value Type |
---|---|---|---|
multipleValueDelimiter | The string to delimit multiple simple values. | , | String |
XSLTFilter
An element used to transform XML data from one form to another using XSLT. The specified XSLT can be used to transform the input XML into XML conforming to another schema or into other forms such as JSON, plain text, etc.
Element properties:
Name | Description | Default Value | Value Type |
---|---|---|---|
suppressXSLTNotFoundWarnings | If XSLT cannot be found to match the name pattern suppress warnings. | false | Boolean |
usePool | Advanced: Choose whether or not you want to use cached XSLT templates to improve performance. | true | Boolean |
xslt | The XSLT to use. | - | Document |
xsltNamePattern | A name pattern to load XSLT dynamically. | - | String |
Writer
Writers consume XML events (from Parsers and Filters) and convert them into a stream of bytes using the character encoding configured on the Writer (if applicable). The output data can then be fed to a Destination.
JSONWriter
Writer to convert XML data conforming to the http://www.w3.org/2013/XSL/json XML Schema into JSON format.
Element properties:
Name | Description | Default Value | Value Type |
---|---|---|---|
encoding | The output character encoding to use. | UTF-8 | String |
indentOutput | Should output JSON be indented and include new lines (pretty printed)? | false | Boolean |
TextWriter
Writer to convert XML character data events into plain text output.
Element properties:
Name | Description | Default Value | Value Type |
---|---|---|---|
encoding | The output character encoding to use. | UTF-8 | String |
footer | Footer text that can be added to the output at the end. | - | String |
header | Header text that can be added to the output at the start. | - | String |
XMLWriter
Writer to convert XML events data into XML output in the specified character encoding.
Element properties:
Name | Description | Default Value | Value Type |
---|---|---|---|
encoding | The output character encoding to use. | UTF-8 | String |
indentOutput | Should output XML be indented and include new lines (pretty printed)? | false | Boolean |
suppressXSLTNotFoundWarnings | If XSLT cannot be found to match the name pattern suppress warnings. | false | Boolean |
xslt | A previously saved XSLT, used to modify the output via xsl:output attributes. | - | Document |
xsltNamePattern | A name pattern for dynamic loading of an XSLT, that will modfy the output via xsl:output attributes. | - | String |
Destination
Destination elements consume a stream of bytes from a Writer and persist then to a destination. This could be a file on a file system or to Stroom’s stream store.
AnnotationWriter
Consume XML documents in the annotation:1
namespace and writes them as Stroom Annotations.
Allows for the annotating of events that meet some criteria.
FileAppender
A destination used to write an output stream to a file on the file system. If multiple paths are specified in the ‘outputPaths’ property it will pick one at random to write to.
Element properties:
Name | Description | Default Value | Value Type |
---|---|---|---|
filePermissions | Set file system permissions of finished files (example: ‘rwxr–r–’) | - | String |
outputPaths | One or more destination paths for output files separated with commas. Replacement variables can be used in path strings such as ${feed}. | - | String |
rollSize | When the current output file exceeds this size it will be closed and a new one created. | - | String |
splitAggregatedStreams | Choose if you want to split aggregated streams into separate output files. | false | Boolean |
splitRecords | Choose if you want to split individual records into separate output files. | false | Boolean |
useCompression | Apply GZIP compression to output files | false | Boolean |
HDFSFileAppender
A destination used to write an output stream to a file on a Hadoop Distributed File System. If multiple paths are specified in the ‘outputPaths’ property it will pick one at random.
Element properties:
Name | Description | Default Value | Value Type |
---|---|---|---|
fileSystemUri | URI for the Hadoop Distributed File System (HDFS) to connect to, e.g. hdfs://mynamenode.mydomain.com:8020 | - | String |
outputPaths | One or more destination paths for output files separated with commas. Replacement variables can be used in path strings such as ${feed}. | - | String |
rollSize | When the current output file exceeds this size it will be closed and a new one created. | - | String |
runAsUser | The user to connect to HDFS as | - | String |
splitAggregatedStreams | Choose if you want to split aggregated streams into separate output files. | false | Boolean |
splitRecords | Choose if you want to split individual records into separate output files. | false | Boolean |
HTTPAppender
A destination used to write an output stream to a remote HTTP(S) server.
This element should be preferred over the deprecated HttpPostFilter .
Element properties:
Name | Description | Default Value | Value Type |
---|---|---|---|
connectionTimeout | How long to wait before we abort sending data due to connection timeout | - | String |
contentType | The content type | application/json | String |
forwardChunkSize | Should data be sent in chunks and if so how big should the chunks be | - | String |
forwardUrl | The URL to send data to | - | String |
hostnameVerificationEnabled | Verify host names | true | Boolean |
httpHeadersIncludeStreamMetaData | Provide stream metadata as HTTP headers | true | Boolean |
httpHeadersUserDefinedHeader1 | Additional HTTP Header 1, format is ‘HeaderName: HeaderValue’ | - | String |
httpHeadersUserDefinedHeader2 | Additional HTTP Header 2, format is ‘HeaderName: HeaderValue’ | - | String |
httpHeadersUserDefinedHeader3 | Additional HTTP Header 3, format is ‘HeaderName: HeaderValue’ | - | String |
keyStorePassword | The key store password | - | String |
keyStorePath | The key store file path on the server | - | String |
keyStoreType | The key store type | JKS | String |
logMetaKeys | Which meta data values will be logged in the send log | guid,feed,system,environment,remotehost,remoteaddress | String |
readTimeout | How long to wait for data to be available before closing the connection | - | String |
requestMethod | The request method, e.g. POST | POST | String |
rollSize | When the current output exceeds this size it will be closed and a new one created. | - | String |
splitAggregatedStreams | Choose if you want to split aggregated streams into separate output. | false | Boolean |
splitRecords | Choose if you want to split individual records into separate output. | false | Boolean |
sslProtocol | The SSL protocol to use | TLSv1.2 | String |
trustStorePassword | The trust store password | - | String |
trustStorePath | The trust store file path on the server | - | String |
trustStoreType | The trust store type | JKS | String |
useCompression | Should data be compressed when sending | true | Boolean |
useJvmSslConfig | Use JVM SSL config. Set this to true if the Stroom node has been configured with key/trust stores using java system properties like ‘javax.net.ssl.keyStore’.Set this to false if you are explicitly setting key/trust store properties on this HttpAppender. | true | Boolean |
RollingFileAppender
A destination used to write an output stream to a file on the file system.
If multiple paths are specified in the ‘outputPaths’ property it will pick one at random to write to.
This is distinct from the FileAppender in that when the rollSize
is reached it will move the current file to the path specified in rolledFileName
and resume writing to the original path.
This allows other processes to follow the changes to a single file path, e.g. when using tail
.
On system shutdown all active files will be rolled.
Element properties:
Name | Description | Default Value | Value Type |
---|---|---|---|
fileName | Choose the name of the file to write. | - | String |
filePermissions | Set file system permissions of finished files (example: ‘rwxr–r–’) | - | String |
frequency | Choose how frequently files are rolled. | 1h | String |
outputPaths | One or more destination paths for output files separated with commas. Replacement variables can be used in path strings such as ${feed}. | - | String |
rollSize | When the current output file exceeds this size it will be closed and a new one created, e.g. 10M, 1G. | 100M | String |
rolledFileName | Choose the name that files will be renamed to when they are rolled. | - | String |
schedule | Provide a cron expression to determine when files are rolled. | - | String |
useCompression | Apply GZIP compression to output files | false | Boolean |
RollingStreamAppender
A destination used to write one or more output streams to a new stream which is then rolled when it reaches a certain size or age. A new stream will be created after the size or age criteria has been met. On system shutdown all active streams will be rolled.
Element properties:
Name | Description | Default Value | Value Type |
---|---|---|---|
feed | The feed that output stream should be written to. If not specified the feed the input stream belongs to will be used. | - | Document |
frequency | Choose how frequently streams are rolled. | 1h | String |
rollSize | Choose the maximum size that a stream can be before it is rolled. | 100M | String |
schedule | Provide a cron expression to determine when streams are rolled. | - | String |
segmentOutput | Should the output stream be marked with indexed segments to allow fast access to individual records? | true | Boolean |
streamType | The stream type that the output stream should be written as. This must be specified. | - | String |
volumeGroup | Optionally override the default volume group of the destination feed. | - | String |
StandardKafkaProducer
Consumes XML documents in the kafka-records:2
namespace.
For each <kafkaRecord>
element converts it into a Kafka message that is passed to the Kafka producer defined by the kafkaConfig
property
Element properties:
Name | Description | Default Value | Value Type |
---|---|---|---|
flushOnSend | At the end of the stream, wait for acknowledgement from the Kafka broker for all the messages sent. This ensures errors are caught in the pipeline process. | true | Boolean |
kafkaConfig | Kafka configuration details relating to where and how to send Kafka messages. | - | Document |
StreamAppender
A destination used to write the output stream to a new stream in the stream store. The configuration allows for starting a new stream once a size threshold is reached.
Element properties:
Name | Description | Default Value | Value Type |
---|---|---|---|
feed | The feed that output stream should be written to. If not specified the feed the input stream belongs to will be used. | - | Document |
rollSize | When the current output stream exceeds this size it will be closed and a new one created. | - | String |
segmentOutput | Should the output stream be marked with indexed segments to allow fast access to individual records? | true | Boolean |
splitAggregatedStreams | Choose if you want to split aggregated streams into separate output streams. | false | Boolean |
splitRecords | Choose if you want to split individual records into separate output streams. | false | Boolean |
streamType | The stream type that the output stream should be written as. This must be specified. | - | String |
volumeGroup | Optionally override the default volume group of the destination feed. | - | String |
StroomStatsAppender
This element is deprecated and should not be used.
Element properties:
Name | Description | Default Value | Value Type |
---|---|---|---|
flushOnSend | At the end of the stream, wait for acknowledgement from the Kafka broker for all the messages sent. This ensures errors are caught in the pipeline process. | true | Boolean |
kafkaConfig | The Kafka config to use. | - | Document |
maxRecordCount | Choose the maximum number of records or events that a message will contain | 1 | String |
statisticsDataSource | The stroom-stats data source to record statistics against. | - | Document |