Ingest Aggregation Example
Simple Aggregation
To demonstrate basic aggregation at ingest we can take the following graph as a start point and modify the schema so that the properties are summed together:
graph LR
A(["Person
ID: Dave"])
--
"Commit
added: 6
removed: 8"
-->
B(["Repository
ID: r1"])
A
--
"Commit
added: 35
removed: 10"
-->
B
As you can see we have two entity groups, Person
and Repository
, both without
any properties. We have one edge type Commit
with two properties added
and removed
. Translating this into a basic Gaffer schema gives the following:
Note
In Gaffer every property type defined in the schema must specify an
"aggregationFunction"
unless you specify "aggregate": "false"
on the type.
{
"edges": {
"Commit": {
"source": "id.person.string",
"destination": "id.repo.string",
"directed": "true",
"properties": {
"added": "property.integer",
"removed": "property.integer"
}
}
},
"entities": {
"Person": {
"description": "Entity representing a person vertex",
"vertex": "id.person.string"
},
"Repository": {
"description": "Entity representing a repository vertex",
"vertex": "id.repo.string"
}
}
}
{
"types": {
"id.person.string": {
"description": "A basic type to hold the string id of a person entity",
"class": "java.lang.String"
},
"id.repo.string": {
"description": "A basic type to hold the string id of a repository entity",
"class": "java.lang.String"
},
"property.integer": {
"description": "A basic type to hold integer properties of elements",
"class": "java.lang.Integer",
"aggregateFunction": {
"class": "uk.gov.gchq.koryphe.impl.binaryoperator.Sum"
}
},
"true": {
"description": "A simple boolean that must always be true.",
"class": "java.lang.Boolean",
"validateFunctions": [
{
"class": "uk.gov.gchq.koryphe.impl.predicate.IsTrue"
}
]
}
}
}
In the above schema you can see we have applied an aggregation function to the
"property.integer"
type which will sum the property to give a total. For this
function we must specify a class that will do the aggregation. There exists a
few default classes and some additional ones implemented by the Koryphe module
which you can read more about in the reference guide.
Tip
It is possible to create your own aggregation functions however, they must
extend the java.util.function.BiFunction
interface.
Loading the data into a Graph using the example schema we can form the Graph
and see the aggregation in action. First load the data via the REST API
using the AddElements
operation like below:
{
"class": "AddElements",
"input": [
{
"class": "Edge",
"group": "Commit",
"source": "Dave",
"destination": "r1",
"directed": true,
"properties": {
"added": 6,
"removed": 8
}
},
{
"class": "Edge",
"group": "Commit",
"source": "Dave",
"destination": "r1",
"directed": true,
"properties": {
"added": 35,
"removed": 10
}
},
{
"class": "Entity",
"group": "Person",
"vertex": "Dave"
},
{
"class": "Entity",
"group": "Repository",
"vertex": "r1"
}
]
}
Now running a query on these elements with the seed as "Dave"
you can see
that all the commit edges have been aggregated together to give a total
for the added
and removed
properties.
[
{
"class": "uk.gov.gchq.gaffer.data.element.Entity",
"group": "Person",
"vertex": "Dave",
"properties": {}
},
{
"class": "uk.gov.gchq.gaffer.data.element.Edge",
"group": "Commit",
"source": "Dave",
"destination": "r1",
"directed": true,
"matchedVertex": "SOURCE",
"properties": {
"removed": 18,
"added": 41
}
}
]
Using the groupBy field
It is also possible to have a fine control over exactly when aggregation is
applied to by using the groupBy
parameter. This parameter can be added to
the schema so that aggregation is applied only when a specific property is the
same between elements.
To demonstrate this functionality we can expand the example from the previous
section to add a new property to the Commit
edge called issue
which
hypothetically represents the issue number the commit relates to.
Now we can add the groupBy
parameter to the schema so that all Commit
edges
with the same issue
property will be aggregated like before to sum the
removed
and added
properties:
"edges": {
"Commit": {
"source": "id.person.string",
"destination": "id.repo.string",
"directed": "true",
"properties": {
"added": "property.integer",
"removed": "property.integer",
"issue": "property.integer"
},
"groupBy": [
"issue"
]
}
}
Now say if we added the following element to the graph and run a query to get the edges like before:
{
"class": "AddElements",
"input": [
{
"class": "Edge",
"group": "Commit",
"source": "Dave",
"destination": "r1",
"directed": true,
"properties": {
"added": 20,
"removed": 5,
"issue": 1
}
},
{
"class": "Edge",
"group": "Commit",
"source": "Dave",
"destination": "r1",
"directed": true,
"properties": {
"added": 6,
"removed": 8,
"issue": 1
}
},
{
"class": "Edge",
"group": "Commit",
"source": "Dave",
"destination": "r1",
"directed": true,
"properties": {
"added": 60,
"removed": 4,
"issue": 2
}
},
{
"class": "Edge",
"group": "Commit",
"source": "Dave",
"destination": "r1",
"directed": true,
"properties": {
"added": 35,
"removed": 10,
"issue": 2
}
},
{
"class": "Entity",
"group": "Person",
"vertex": "Dave"
},
{
"class": "Entity",
"group": "Repository",
"vertex": "r1"
}
]
}
[
{
"class": "uk.gov.gchq.gaffer.data.element.Entity",
"group": "Person",
"vertex": "Dave",
"properties": {}
},
{
"class": "uk.gov.gchq.gaffer.data.element.Edge",
"group": "Commit",
"source": "Dave",
"destination": "r1",
"directed": true,
"matchedVertex": "SOURCE",
"properties": {
"issue": 1,
"removed": 13,
"added": 26
}
},
{
"class": "uk.gov.gchq.gaffer.data.element.Edge",
"group": "Commit",
"source": "Dave",
"destination": "r1",
"directed": true,
"matchedVertex": "SOURCE",
"properties": {
"issue": 2,
"removed": 14,
"added": 95
}
}
]
As you can see we end up with two Commit
edges relating to each issue
with
all other properties aggregated together.
Expanded Example
The example from the first section is a good demonstration of how aggregation works, but just having the total number of some properties may not be the most useful. To demonstrate a more complex use case we will modify the example to add some new properties to the edges, so that after aggregation we'll have this graph:
graph LR
A(["Person
ID: Dave"])
--
"Commit
first: 2015-12-25
latest: 2023-01-01
count: 3"
-->
B(["Repository
ID: r1"])
What we are doing with this graph is aggregating any new Commit
edges so that
the first
and latest
commit dates are kept updated as new edges are added to
the Graph whilst incrementing a count
property to indicate overall how many
Commit
edges are between two vertexes.
We will modify the schema from the basic example add the different properties and set up the aggregation functions:
Tip
For good practice we have also added some validateFunctions
to give
minimum confidence in the values of the types. Please see the
predicates reference guide
for more information.
{
"edges": {
"Commit": {
"source": "id.person.string",
"destination": "id.repo.string",
"directed": "true",
"properties": {
"first": "property.date.first",
"latest": "property.date.latest",
"count": "property.integer.count"
}
}
},
"entities": {
"Person": {
"description": "Entity representing a person vertex",
"vertex": "id.person.string"
},
"Repository": {
"description": "Entity representing a repository vertex",
"vertex": "id.repo.string"
}
}
}
{
"types": {
"id.person.string": {
"description": "A basic type to hold the string id of a person entity",
"class": "java.lang.String"
},
"id.repo.string": {
"description": "A basic type to hold the string id of a repository entity",
"class": "java.lang.String"
},
"property.integer.count": {
"description": "A basic type to hold a count property that must be greater than 0",
"class": "java.lang.Integer",
"aggregateFunction": {
"class": "uk.gov.gchq.koryphe.impl.binaryoperator.Sum"
},
"validateFunctions": [
{
"class": "uk.gov.gchq.koryphe.impl.predicate.IsMoreThan",
"orEqualTo": true,
"value": 0
}
]
},
"property.date.first": {
"description": "A Date type to hold first date property after aggregation",
"class": "java.util.Date",
"aggregateFunction": {
"class": "uk.gov.gchq.koryphe.impl.binaryoperator.Min"
},
"validateFunctions": [
{
"class": "uk.gov.gchq.koryphe.impl.predicate.Exists"
}
]
},
"property.date.latest": {
"description": "A Date type to hold latest date property after aggregation",
"class": "java.util.Date",
"aggregateFunction": {
"class": "uk.gov.gchq.koryphe.impl.binaryoperator.Max"
},
"validateFunctions": [
{
"class": "uk.gov.gchq.koryphe.impl.predicate.Exists"
}
]
},
"true": {
"description": "A simple boolean that must always be true.",
"class": "java.lang.Boolean",
"validateFunctions": [
{
"class": "uk.gov.gchq.koryphe.impl.predicate.IsTrue"
}
]
}
}
}
As you can see in the types schema we have applied the Min
function to the
property.date.first
type so that will always be aggregated to be the
earliest date property. Similarly we apply the Max
function to the
property.date.latest
to always give us the latest date property. The
property.integer.count
property keeps the simple Sum
function to keep a
total of the number of edges.
Applying these schemas to a Graph we can then add the following elements to demonstrate the aggregation in practice:
Note
The dates in the JSON are in milliseconds since Unix Epoch instead of a
typical format like dd/mm/yyyy
due to how Jackson serialises
java.util.Date
types.
{
"class": "AddElements",
"input": [
{
"class": "Edge",
"group": "Commit",
"source": "Dave",
"destination": "r1",
"directed": true,
"properties": {
"first": {
"java.util.Date": 1451044800146
},
"latest": {
"java.util.Date": 1451044800146
},
"count": 1
}
},
{
"class": "Edge",
"group": "Commit",
"source": "Dave",
"destination": "r1",
"directed": true,
"properties": {
"first": {
"java.util.Date": 1514808000146
},
"latest": {
"java.util.Date": 1514808000146
},
"count": 1
}
},
{
"class": "Edge",
"group": "Commit",
"source": "Dave",
"destination": "r1",
"directed": true,
"properties": {
"first": {
"java.util.Date": 1672574400146
},
"latest": {
"java.util.Date": 1672574400146
},
"count": 1
}
},
{
"class": "Entity",
"group": "Person",
"vertex": "Dave"
},
{
"class": "Entity",
"group": "Repository",
"vertex": "r1"
}
]
}
Tip
Loading the elements like this is just for demonstration purposes, it can
look a little unintuitive as we have the same data for first
and latest
properties. In production you may want to create a custom
ElementsGenerator
so that the elements are created correctly from your raw
data based on the graph schema.
Now running a query on these elements with the seed as "Dave"
we can see that
we get back one edge with aggregated properties holding the first
and latest
commit times as well as a count
with the current number of edges.
[
{
"class": "uk.gov.gchq.gaffer.data.element.Entity",
"group": "Person",
"vertex": "Dave",
"properties": {}
},
{
"class": "uk.gov.gchq.gaffer.data.element.Edge",
"group": "Commit",
"source": "Dave",
"destination": "r1",
"directed": true,
"matchedVertex": "SOURCE",
"properties": {
"count": 3,
"first": {
"java.util.Date": 1451044800146
},
"latest": {
"java.util.Date": 1672574400146
}
}
}
]