Kafka GraphDB connector

Overview and features

The Kafka connector provides a means to synchronize changes to the RDF model to any Kafka consumer, staying automatically up-to-date with the GraphDB repository data.

The Connectors provide synchronization at the entity level, where an entity is defined as having a unique identifier (an IRI) and a set of properties and property values. In terms of RDF, this corresponds to a set of triples that have the same subject. In addition to simple properties (defined by a single triple), the Connectors support property chains. A property chain is defined as a sequence of triples where each triple’s object is the subject of the following triple.

On the Kafka side, the RDF entities are translated to JSON documents.

The main features of the Kafka Connector are:

  • maintaining a Kafka topic that is always in sync with the data stored in GraphDB;

  • multiple independent instances per repository;

  • the entities for synchronization are defined by:

    • a list of fields (on the Kafka side) and property chains (on the GraphDB side) whose values will be synchronized;

    • a list of rdf:type’s of the entities for synchronization;

    • a list of languages for synchronization (the default is all languages);

    • additional filtering by property and value.

Unlike the Elasticsearch, Solr, and Lucene connectors, the Kafka connector does not have a query interface since Kafka is a simple message queue and does not provide search functionality.

Each feature is described in detail below.

In terms of Kafka terminology and behavior:

  • Each connector instance must be assigned to a fixed Kafka topic.

  • The connector is a Kafka producer, and does not have any information about the Kafka consumers.

  • The partitions are assigned by the Kafka framework and not the connector.

Usage

All interactions with the Kafka GraphDB Connector are done through SPARQL queries.

There are three types of SPARQL queries:

  • INSERT for creating and deleting connector instances;

  • SELECT for listing connector instances and querying their configuration parameters;

  • INSERT for storing data as part of the normal GraphDB data workflow.

In general, this corresponds to INSERT adds or modifies data and SELECT queries existing data.

Each connector implementation defines its own IRI prefix to distinguish it from other connectors. For the Kafka GraphDB Connector, this is http://www.ontotext.com/connectors/kafka#. Each command or predicate executed by the connector uses this prefix, e.g., http://www.ontotext.com/connectors/kafka#createConnector to create a connector instance for Kafka.

Individual instances of a connector are distinguished by unique names that are also IRIs. They have their own prefix to avoid clashing with any of the command predicates. For Kafka, the instance prefix is http://www.ontotext.com/connectors/kafka/instance#.

Sample data

All examples use the following sample data, which describes five fictitious wines: Yoyowine, Franvino, Noirette, Blanquito, and Rozova, as well as the grape varieties required to make these wines. The minimum required ruleset level in GraphDB is RDFS.

@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
@prefix wine: <http://www.ontotext.com/example/wine#> .

wine:RedWine rdfs:subClassOf wine:Wine .
wine:WhiteWine rdfs:subClassOf wine:Wine .
wine:RoseWine rdfs:subClassOf wine:Wine .

wine:Merlo
    rdf:type wine:Grape ;
    rdfs:label "Merlo" .

wine:CabernetSauvignon
    rdf:type wine:Grape ;
    rdfs:label "Cabernet Sauvignon" .

wine:CabernetFranc
    rdf:type wine:Grape ;
    rdfs:label "Cabernet Franc" .

wine:PinotNoir
    rdf:type wine:Grape ;
    rdfs:label "Pinot Noir" .

wine:Chardonnay
    rdf:type wine:Grape ;
    rdfs:label "Chardonnay" .

wine:Yoyowine
    rdf:type wine:RedWine ;
    wine:madeFromGrape wine:CabernetSauvignon ;
    wine:hasSugar "dry" ;
    wine:hasYear "2013"^^xsd:integer .

wine:Franvino
    rdf:type wine:RedWine ;
    wine:madeFromGrape wine:Merlo ;
    wine:madeFromGrape wine:CabernetFranc ;
    wine:hasSugar "dry" ;
    wine:hasYear "2012"^^xsd:integer .

wine:Noirette
    rdf:type wine:RedWine ;
    wine:madeFromGrape wine:PinotNoir ;
    wine:hasSugar "medium" ;
    wine:hasYear "2012"^^xsd:integer .

wine:Blanquito
    rdf:type wine:WhiteWine ;
    wine:madeFromGrape wine:Chardonnay ;
    wine:hasSugar "dry" ;
    wine:hasYear "2012"^^xsd:integer .

wine:Rozova
    rdf:type wine:RoseWine ;
    wine:madeFromGrape wine:PinotNoir ;
    wine:hasSugar "medium" ;
    wine:hasYear "2013"^^xsd:integer .

Setup and maintenance

Prerequisites

Third-party component versions

This version of the Kafka GraphDB Connector uses Kafka version 2.7.0.

Creating a connector instance

Creating a connector instance is done by sending a SPARQL query with the following configuration data:

  • the name of the connector instance (e.g., my_index);

  • a Kafka node and topic to synchronize to;

  • classes to synchronize;

  • properties to synchronize.

The configuration data has to be provided as a JSON string representation and passed together with the create command.

If you create the connector via the Workbench, no matter which way you use, you will be presented with a pop-up screen showing you the connector creation progress.

Using the Workbench

  1. Go to Setup ‣ Connectors.

  2. Click New Connector in the tab of the respective Connector type you want to create.

  3. Fill in the configuration form.

  4. Execute the CREATE statement from the form by clicking OK. Alternatively, you can view its SPARQL query by clicking View SPARQL Query, and then copy it to execute it manually or integrate it in automation scripts.

Using the create command

The create command is triggered by a SPARQL INSERT with the createConnector predicate, e.g., it creates a connector instance called my_index, which synchronizes the wines from the sample data above:

PREFIX kafka: <http://www.ontotext.com/connectors/kafka#>
PREFIX kafka-inst: <http://www.ontotext.com/connectors/kafka/instance#>

INSERT DATA {
    kafka-inst:my_index kafka:createConnector '''
{
  "kafkaNode": "localhost:9092",
  "kafkaTopic": "my_index",
  "types": [
    "http://www.ontotext.com/example/wine#Wine"
  ],
  "fields": [
    {
      "fieldName": "grape",
      "propertyChain": [
        "http://www.ontotext.com/example/wine#madeFromGrape"
      ]
    },
    {
      "fieldName": "sugar",
      "propertyChain": [
        "http://www.ontotext.com/example/wine#hasSugar"
      ]
    },
    {
      "fieldName": "year",
      "propertyChain": [
        "http://www.ontotext.com/example/wine#hasYear"
      ]
    }
  ]
}
''' .
}

The above command creates a new Kafka connector instance that connects to the Kafka instance accessible at port 9200 on the localhost as specified by the kafkaNode key.

The "types" key defines the RDF type of the entities to synchronize and, in the example, it is only entities of the type http://www.ontotext.com/example/wine#Wine (and its subtypes). The "fields" key defines the mapping from RDF to Kafka. The basic building block is the property chain, i.e., a sequence of RDF properties where the object of each property is the subject of the following property. In the example, three bits of information are mapped - the grape the wines are made of, sugar content, and year. Each chain is assigned a short and convenient field name: “grape”, “sugar”, and “year”. The field names are later used in the queries.

Grape is an example of a property chain composed of more than one property. First, we take the wine’s madeFromGrape property, the object of which is an instance of the type Grape, and then we take the rdfs:label of this instance. Sugar and year are both composed of a single property that links the value directly to the wine.

Working with a secured Kafka broker

GraphDB can connect to a secured Kafka broker using the SASL/PLAIN authentication mechanism. To configure it, set the kafkaPlainAuthUsername and kafkaPlainAuthPassword parameters. Since the password will be transmitted in clear text, it is recommended to enable SSL on the Kafka broker, and accordingly set the kafkaSSL parameter to true.

There is no explicitly configurable support for other authentication mechanism supported Kafka. It should be possible to configure most of them by supplying the relevant Kafka producer properties via the kafkaProducerConfig parameter.

Dropping a connector instance

Dropping a connector instance removes all references to its external store from GraphDB as well as the Kafka index associated with it.

The drop command is triggered by a SPARQL INSERT with the dropConnector predicate where the name of the connector instance has to be in the subject position, e.g., this removes the connector my_index:

PREFIX kafka: <http://www.ontotext.com/connectors/kafka#>
PREFIX kafka-inst: <http://www.ontotext.com/connectors/kafka/instance#>

INSERT DATA {
  kafka-inst:my_index kafka:dropConnector [] .
}

You can also force drop a connector in case a normal delete does not work. The force delete will remove the connector even if part of the operation fails. Go to Setup ‣ Connectors where you will see the already existing connectors that you have created. Click the delete icon, and check Force delete in the dialog box.

_images/connectors_force_delete.png

Retrieving the create options for a connector instance

You can view the options string that was used to create a particular connector instance with the following query:

PREFIX kafka: <http://www.ontotext.com/connectors/kafka#>
PREFIX kafka-inst: <http://www.ontotext.com/connectors/kafka/instance#>

SELECT ?createString {
  kafka-inst:my_index kafka:listOptionValues ?createString .
}

Listing available connector instances

In the Connectors management view

Existing Connector instances show under Existing connectors (below the New Connector button). Click the name of an instance to view its configuration and SPARQL query, or click the repair / delete icons to perform these operations.

_images/connectors.png

With a SPARQL query

Listing connector instances returns all previously created instances. It is a SELECT query with the listConnectors predicate:

PREFIX kafka: <http://www.ontotext.com/connectors/kafka#>

SELECT ?cntUri ?cntStr {
  ?cntUri kafka:listConnectors ?cntStr .
}

?cntUri is bound to the prefixed IRI of the connector instance that was used during creation, e.g., http://www.ontotext.com/connectors/kafka/instance#my_index, while ?cntStr is bound to a string, representing the part after the prefix, e.g., "my_index".

Instance status check

The internal state of each connector instance can be queried using a SELECT query and the connectorStatus predicate:

PREFIX kafka: <http://www.ontotext.com/connectors/kafka#>

SELECT ?cntUri ?cntStatus {
  ?cntUri kafka:connectorStatus ?cntStatus .
}

?cntUri is bound to the prefixed IRI of the connector instance, while ?cntStatus is bound to a string representation of the status of the connector represented by this IRI. The status is key-value based.

Working with data

Adding, updating, and deleting data

From the user point of view, all synchronization happens transparently without using any additional predicates or naming a specific store explicitly, i.e., you must simply execute standard SPARQL INSERT/DELETE queries. This is achieved by intercepting all changes in the plugin and determining which abstract documents need to be updated.

List of creation parameters

The creation parameters define how a connector instance is created by the :createConnector predicate. Some are required and some are optional. All parameters are provided together in a JSON object, where the parameter names are the object keys. Parameter values may be simple JSON values such as a string or a boolean, or they can be lists or objects.

All of the creation parameters can also be set conveniently from the Create Connector user interface without any knowledge of JSON.

readonly (boolean), optional, read-only mode

A read-only connector will index all existing data in the repository at creation time, but, unlike non-read-only connectors, it will:

  • Not react to updates. Changes will not be synced to the connector.

  • Not keep any extra structures (such as the internal Lucene index for tracking updates to chains)

The only way to index changes in data after the connector has been created is to repair (or drop/recreate) the connector.

importGraph (boolean), optional, a virtual graph containing data from which to create the connector

Creates a connector whose data will come from statements inserted into a special virtual graph instead of data contained in the repository. The virtual graph is kafka:graph, where the PREFIX kafka: <http://www.ontotext.com/connectors/kafka#> control prefix is used. The data have to be inserted into this graph before the connector create statement is executed.

Both the insertion into the special graph and the create statement must be in the same transaction. In GDB Workbench, this can be done by pasting them one after another in the SPARQL editor and putting a semicolon at the end of the first INSERT. This functionality requires read-only mode.

PREFIX kafka: <http://www.ontotext.com/connectors/kafka#>
INSERT {
    GRAPH kafka:graph {
        ...
    }
} WHERE {
        ...
};
PREFIX kafka: <http://www.ontotext.com/connectors/kafka#>
PREFIX kafka-inst:<http://www.ontotext.com/connectors/kafka/instance#>
INSERT DATA {
    kafka-inst:my_index kafka:createConnector '''
{
  "readonly": true,
  "importGraph": true,
  "fields": [],
  "languages": [],
  "types": [],
}
''' .
}
importFile (string), optional, an RDF file with data from which to create the connector

Creates a connector whose data will come from an RDF file on the file system instead of data contained in the repository. The value must be the full path to the RDF file. This functionality requires readonly mode.

detectFields (boolean), optional, detects fields

This mode introduces automatic field detection when creating a connector. Instead of providing fields: [ ... ] in the JSON, you can skip this and get automatic fields where each field will have a single predicate chain and its field name will be the same as the predicate.

In this mode, specifying types is optional too. If types are not provided, then all types will be indexed. This mode requires importGraph or importFile.

Once the connector is created, you can inspect the detected fields in the Connector management section of the Workbench.

kafkaNode (string), required, the Kafka instance to sync to

As Kafka is a third-party service, you have to specify the node where it is running. The format of the node value is of the form http://hostname.domain:port, https:// is allowed too. No default value.

kafkaTopic (string), required, the Kafka topic to send documents to.

No default value.

kafkaSSL (boolean), optional, controls whether to use an SSL connection to the Kafka broker.

False by default.

kafkaPlainAuthUsername (string), optional, supplies the username for Kafka SASL PLAIN authentication.

No default value.

kafkaPlainAuthPassword (string), optional, supplies the password for Kafka SASL PLAIN authentication.

No default value.

bulkUpdateBatchSize (integer), controls the maximum batch size in bytes and corresponds to Kafka producer config batch.size.

Default value is 1,048,576 (1 megabyte).

bulkUpdateRequestSize (integer), controls the maximum request size (and consequently the maximum size per document) in bytes.

Default value is 1,048,576 (1 megabyte).

kafkaCompressionType (string), sets the compression to use when sending documents to Kafka.

One of none, gzip, lz4, snappy), the default is snappy. This corresponds to Kafka producer config property compression.type.

kafkaProducerId (string), an optional identifier that allows for separate Kafka producers with different options to the same Kafka broker.

No default – all instances to the same Kafka broker will use a shared Kafka producer and thus must have the same options. See also Producer sharing and Conflict resolution.

kafkaProducerConfig (JSON), optional, the settings for creating the Kafka producer.

This option is passed directly to the Kafka producer when it is instantiated. Each key is a Kafka producer configuration property. Some config keys, e.g., transactional.id, are not allowed here. No default.

kafkaIgnoreDeleteAll (boolean), optional, a flag that, when selected, will not notify Kafka when all repository statements are removed.

GraphDB handles the removal of all statements as a special operation that is manifested as sending a Kafka record with NULL key and NULL value. If this flag is true, no such record will be sent. False by default.

kafkaPropagateConfig (boolean), optional, a non-persisted flag that, when selected, will force propagating the Kafka config to other connector instances using the same Kafka broker.

False by default. See also Producer sharing and Conflict resolution.

types (list of IRIs), required, specifies the types of entities to sync

The RDF types of entities to sync are specified as a list of IRIs. At least one type IRI is required.

Use the pseudo-IRI $any to sync entities that have at least one RDF type.

Use the pseudo-IRI $untyped to sync entities regardless of whether they have any RDF type.

languages (list of strings), optional, valid languages for literals

RDF data is often multilingual, but only some of the languages represented in the literal values can be mapped. This can be done by specifying a list of language ranges to be matched to the language tags of literals according to RFC 4647, Section 3.3.1. Basic Filtering. In addition, an empty range can be used to include literals that have no language tag. The list of language ranges maps all existing literals that have matching language tags.

fields (list of field objects), required, defines the mapping from RDF to Kafka

The fields specify exactly which parts of each entity will be synchronized as well as the specific details on the connector side. The field is the smallest synchronization unit and it maps a property chain from GraphDB to a field in Kafka. The fields are specified as a list of field objects. At least one field object is required. Each field object has further keys that specify details.

  • fieldName (string), required, the name of the field in Kafka

    The name of the field defines the mapping on the connector side. It is specified by the key fieldName with a string value. The field name is used as the key in the JSON document that will be sent to Kafka.

  • fieldNameTransform (one of none, predicate, or predicate.localName), optional, none by default

    The field name transform defines an optional transformation of the field name.

    • none: The field name is supplied via the fieldName option.

    • predicate: The field name is derived from the full IRI of the last predicate of the chain, e.g., if the last predicate was http://www.w3.org/2000/01/rdf-schema#label, then the field name will be http://www.w3.org/2000/01/rdf-schema#label too.

    • predicate.localName: The field name is the derived from the local name of the IRI of the last predicate of the chain, e.g., if the last predicate was http://www.w3.org/2000/01/rdf-schema#comment, then the field name will be comment.

    See Indexing all literals in distinct fields for an example.

  • propertyChain (list of IRIs), required, defines the property chain to reach the value

    The property chain defines the mapping on the GraphDB side. A property chain is defined as a sequence of triples where the entity IRI is the subject of the first triple, its object is the subject of the next triple, etc. In this model, a property chain with a single element corresponds to a direct property defined by a single triple. Property chains are specified as a list of IRIs where at least one IRI must be provided.

    The IRI of the document will be synchronized as the key in the Kafka record.

    See Copy fields for defining multiple fields with the same property chain.

    See Multiple property chains per field for defining a field whose values are populated from more than one property chain.

    See Indexing language tags for defining a field whose values are populated with the language tags of literals.

    See Indexing the IRI of an entity for defining a field whose values are populated with the IRI of the indexed entity.

    See Wildcard literal indexing for defining a field whose values are populated with literals regardless of their predicate.

  • defaultValue (string), optional, specifies a default value for the field

    The default value (defaultValue) provides means for specifying a default value for the field when the property chain has no matching values in GraphDB. The default value can be a plain literal, a literal with a datatype (xsd: prefix supported), a literal with language, or a IRI. It has no default value.

  • indexed (boolean), optional, default true

    If indexed, a field will be included in the JSON document sent to Kafka. True by default.

    If true, this option corresponds to "index" = true. If false, it corresponds to "index" = false.

  • multivalued (boolean), optional, default true

    RDF properties and synchronized fields may have more than one value. If multivalued is set to true, all values will be synchronized to Kafka. If set to false, only a single value will be synchronized. True by default.

  • ignoreInvalidValues (boolean), optional, default false

    Per-field option that controls what happens when a value cannot be converted to the requested (or previously detected) type. False by default.

    Example use: when an invalid date literal like “2021-02-29”^^xsd:date (2021 is not a leap year) needs to be indexed as a date, or when an IRI needs to be indexed as a number.

    Note that some conversions are always valid, for example a literal or an IRI to a string field. When true, such values will be skipped with a note in the logs. When false, such values will break the transaction.

  • array (boolean), optional, default false

    Normally, Kafka creates an array only if more than value is present for a given field. If array is set to true, Kafka will always create an array even for single values. If set to false, Kafka will create arrays for multiple values only. False by default.

  • datatype (string), optional, the manual datatype override

    By default, the Kafka GraphDB Connector uses datatype of literal values to determine how they should be mapped to Kafka types. For more information on the supported datatypes, see Datatype mapping.

    The mapping can be overridden through the property "datatype", which can be specified per field. The value of datatype can be any of the xsd: types supported by the automatic mapping or a native Kafka type prefixed by native:, e.g., both xsd:long and native:long map to the long type in Kafka.

  • objectFields (objects array), optional, nested object mapping

    When native:object is used as a datatype value, provide a mapping for the nested object’s fields. If datatype is not provided, then native:object will be assumed.

    Nested objects support further nested objects with a limit of five levels of nesting. An example on using native:nested type:

    Having the following data consisting of children and grandchildren relations:

    <urn:John>
        a <urn:Person> ;
        <urn:name> "John" ;
        <urn:gender> <urn:Male> ;
        <urn:age> 60 ;
        <urn:hasSpouse> <urn:Mary> ;
        <urn:hasChild> <urn:Billy> ;
        <urn:hasChild> <urn:Annie> .
    
    <urn:Mary>
        a <urn:Person> ;
        <urn:name> "Mary" ;
        <urn:gender> <urn:Female> ;
        <urn:age> 58 ;
        <urn:hasSpouse> <urn:John> ;
        <urn:hasChild> <urn:Billy> .
    
    <urn:Eva>
        a <urn:Person> ;
        <urn:name> "Eva" ;
        <urn:gender> <urn:Female> ;
        <urn:age> 45 ;
        <urn:hasChild> <urn:Annie> .
    
    <urn:Billy>
        a <urn:Person> ;
        <urn:name> "Billy" ;
        <urn:gender> <urn:Male> ;
        <urn:age> 35 ;
        <urn:hasChild> <urn:Tylor> ;
        <urn:hasChild> <urn:Melody> .
    
    <urn:Annie>
        a <urn:Person> ;
        <urn:name> "Annie" ;
        <urn:gender> <urn:Female> ;
        <urn:age> 28 ;
        <urn:hasChild> <urn:Sammy> .
    
    <urn:Tylor>
        a <urn:Person> ;
        <urn:name> "Tylor" ;
        <urn:gender> <urn:Male> ;
        <urn:age> 5 .
    
    <urn:Melody>
        a <urn:Person> ;
        <urn:name> "Melody" ;
        <urn:gender> <urn:Female> ;
        <urn:age> 2 .
    
    <urn:Sammy>
        a <urn:Person> ;
        <urn:name> "Sammy" ;
        <urn:gender> <urn:Male> ;
        <urn:age> 10 .
    
    <urn:Male> <urn:label> "male" .
    
    <urn:Female> <urn:label> "female" .
    

We can create a nested objects index that consists of children and grandchildren with their corresponding fields defining their gender and age:

{
  "fields": [
    {
      "fieldName": "name",
      "propertyChain": [
        "urn:name"
      ]
    },
    {
      "fieldName": "age",
      "propertyChain": [
        "urn:age"
      ],
      "datatype": "xsd:long"
    },
    {
      "fieldName": "hasSpouse",
      "propertyChain": [
        "urn:hasSpouse"
      ]
    },
    {
      "fieldName": "gender",
      "propertyChain": [
        "urn:gender",
        "urn:label"
      ]
    },
    {
      "fieldName": "children",
      "propertyChain": [
        "urn:hasChild"
      ],
      "datatype": "native:nested",
      "objectFields": [
        {
          "fieldName": "id",
          "propertyChain": [
            "$self"
          ]
        },
        {
          "fieldName": "name",
          "propertyChain": [
            "urn:name"
          ]
        },
        {
          "fieldName": "age",
          "propertyChain": [
            "urn:age"
          ],
          "datatype": "xsd:long"
        },
        {
          "fieldName": "gender",
          "propertyChain": [
            "urn:gender",
            "urn:label"
          ]
        },
        {
          "fieldName": "children",
          "propertyChain": [
            "urn:hasChild"
          ],
          "objectFields": [
            {
              "fieldName": "id",
              "propertyChain": [
                "$self"
              ]
            },
            {
              "fieldName": "name",
              "propertyChain": [
                "urn:name"
              ]
            },
            {
              "fieldName": "age",
              "propertyChain": [
                "urn:age"
              ],
              "datatype": "xsd:long"
            }
          ]
        }
      ]
    },
    {
      "fieldName": "grandChildren",
      "propertyChain": [
        "urn:hasChild",
        "urn:hasChild"
      ],
      "datatype": "native:nested",
      "objectFields": [
        {
          "fieldName": "id",
          "propertyChain": [
            "$self"
          ]
        },
        {
          "fieldName": "name",
          "propertyChain": [
            "urn:name"
          ]
        },
        {
          "fieldName": "age",
          "propertyChain": [
            "urn:age"
          ],
          "datatype": "xsd:long"
        },
        {
          "fieldName": "gender",
          "propertyChain": [
            "urn:gender",
            "urn:label"
          ]
        }
      ]
    },
  ],
  "types": [
    "urn:Person"
  ],
  "kafkaNode": ...,
  "kafkaTopic": ...,
  "entityFilter": "?grandChildren.id -> type in (<urn:Person>)"
}

Special field definitions

Copy fields

Often, it is convenient to synchronize one and the same data multiple times with different settings to accommodate for different use cases. The Kafka GraphDB Connector has explicit support for fields that copy their value from another field. This is achieved by specifying a single element in the property chain of the form @otherFieldName, where otherFieldName is another non-copy field. Take the following example:

...
  "fields": [
    {
      "fieldName": "grape",
      "propertyChain": [
        "http://www.ontotext.com/example/wine#madeFromGrape",
        "http://www.w3.org/2000/01/rdf-schema#label"
      ]
    },
    {
      "fieldName": "whiteGrape",
      "propertyChain": [
        "@grape"
      ]
    }
  ],
  "entityFilter": "?whiteGrape -> type = <wine:WhiteGrape>"
...

The snippet creates a field “grape” containing all grapes, and another field “whiteGrape”. Both fields are populated with the same values initially and “whiteGrape” is defined as a copy field that refers to the field “grape”. The field “whiteGrape” is additionally filtered so that only certain grape varieties will be synchronized.

Note

The connector handles copy fields in a more optimal way than specifying a field with exactly the same property chain as another field.

Multiple property chains per field

Sometimes, you have to work with data models that define the same concept (in terms of what you want to index in Kafka) with more than one property chain, e.g., the concept of “name” could be defined as a single canonical name, multiple historical names and some unofficial names. If you want to index these together as a single field in Kafka, you can define this as a multiple property chains field.

Fields with multiple property chains are defined as a set of separate virtual fields that will be merged into a single physical field when indexed. Virtual fields are distinguished by the suffix $xyz, where xyz is any alphanumeric sequence of convenience.For example, we can define the fields name$1 and name$2 like this:

...
  "fields": [
    {
      "fieldName": "name$1",
      "propertyChain": [
        "http://www.ontotext.com/example#canonicalName"
      ],
      "fieldName": "name$2",
      "propertyChain": [
        "http://www.ontotext.com/example#historicalName"
      ]
      ...
    },
...

The values of the fields name$1 and name$2 will be merged and synchronized to the field name in Kafka.

Note

You cannot mix suffixed and unsuffixed fields with the same same, e.g., if you defined myField$new and myField$old, you cannot have a field called just myField.

Filters and fields with multiple property chains

Filters can be used with fields defined with multiple property chains. Both the physical field values and the individual virtual field values are available:

  • Physical fields are specified without the suffix, e.g., ?myField

  • Virtual fields are specified with the suffix, e.g., ?myField$2 or ?myField$alt.

Note

Physical fields cannot be combined with parent() as their values come from different property chains. If you really need to filter the same parent level, you can rewrite parent(?myField) in (<urn:x>, <urn:y>) as parent(?myField$1) in (<urn:x>, <urn:y>) || parent(?myField$2) in (<urn:x>, <urn:y>) || parent(?myField$3) ... and surround it with parentheses if it is a part of a bigger expression.

Indexing language tags

The language tag of an RDF literal can be indexed by specifying a property chain, where the last element is the pseudo-IRI lang(). The property preceding lang() must lead to a literal value. For example:

PREFIX kafka: <http://www.ontotext.com/connectors/kafka#>
PREFIX kafka-inst: <http://www.ontotext.com/connectors/kafka/instance#>

INSERT DATA {
  kafka-inst:my_index kafka:createConnector '''
    {
      "kafkaNode": "localhost:9092",
      "kafkaTopic": "my_index",
      "types": ["http://www.ontotext.com/example#gadget"],
      "fields": [
         {
           "fieldName": "name",
           "propertyChain": [
             "http://www.ontotext.com/example#name"
           ]
         },
         {
           "fieldName": "nameLanguage",
           "propertyChain": [
             "http://www.ontotext.com/example#name",
             "lang()"
           ]
         }
      ],
    }
  ''' .
}

The above connector will index the language tag of each literal value of the property http://www.ontotext.com/example#name into the field nameLanguage.

Wildcard literal indexing

In this mode, the last element of a property chain is a wildcard that will match any predicate that leads to a literal value. Use the special pseudo-IRI $literal as the last element of the property chain to activate it.

Note

Currently, it really means any literal, including literals with data types.

For example:

{
    "fields" : [ {
        "propertyChain" : [ "$literal" ],
        "fieldName" : "name"
    }, {
        "propertyChain" : [ "http://example.com/description", "$literal" ],
        "fieldName" : "description"
    }
    ...
}

See Indexing all literals for a detailed example.

Indexing the IRI of an entity

Sometimes you may need the IRI of each entity (e.g., http://www.ontotext.com/example/wine#Franvino from our small example dataset) indexed as a regular field. This can be achieved by specifying a property chain with a single property referring to the pseudo-IRI $self. For example:

PREFIX kafka: <http://www.ontotext.com/connectors/kafka#>
PREFIX kafka-inst: <http://www.ontotext.com/connectors/kafka/instance#>

INSERT DATA {
    kafka-inst:my_index kafka:createConnector '''
{
  "kafkaNode": "localhost:9092",
  "kafkaTopic": "my_index",
  "types": [
    "http://www.ontotext.com/example/wine#Wine"
  ],
  "fields": [
    {
      "fieldName": "entityId",
      "propertyChain": [
        "$self"
      ],
    },
    {
      "fieldName": "grape",
      "propertyChain": [
        "http://www.ontotext.com/example/wine#madeFromGrape",
        "http://www.w3.org/2000/01/rdf-schema#label"
      ]
    },
  ]
}
''' .
}

The above connector will index the IRI of each wine into the field entityId.

Note

Note that GraphDB will also use the IRI of each entity as the ID of each document in Kafka, which is represented by the field id.

Datatype mapping

The Kafka GraphDB Connector maps different types of RDF values to different types of Kafka values according to the basic type of the RDF value (IRI or literal) and the datatype of literals. The auto-detection uses the following mapping:

RDF value

RDF datatype

JSON type

IRI

n/a

string

literal

any type not explicitly mentioned below

string

literal

xsd:boolean

boolean

literal

xsd:double

number

literal

xsd:float

number

literal

xsd:long

number

literal

xsd:int

number

literal

xsd:dateTime

string in ISO format with time zone

literal

xsd:date

string in ISO format without time zone

literal

xsd:time

string in ISO format with time zone

literal

xsd:gYear

string in ISO format without time zone

literal

xsd:gYearMonth

string in ISO format without time zone

Note

For any given field, the automatic mapping uses the first value it sees. This works fine for clean datasets but might lead to problems, if your dataset has non-normalized data, e.g., the first value has no datatype but other values have.

It is therefore recommended to set datatype to a fixed value, e.g., xsd:date.

Date and time conversion

RDF and ISO use slightly different models for representing dates and times, even though the values might look very similar.

Years in RDF values use the XSD format and are era years, where positive values denote the common era and negative values denote years before the common era. There is no year zero.

Years in the ISO format are proleptic years, i.e., positive values denote years from the common era with any previous eras just going down by one mathematically so there is year zero.

In short:

  • year 2020 CE = year 2020 in XSD = year 2020 in ISO.

  • year 1 CE = year 1 in XSD = year 1 in ISO.

  • year 1 BCE = year -1 in XSD = year 0 in ISO.

  • year 2 BCE = year -2 in XSD = year -1 in ISO.

All years coming from RDF literals will be converted to ISO before sending to Kafka.

Both XSD and ISO date and time values support timezones. In addition to that, XSD defines the lack of a timezone as undetermined. Since we do not want to have any undetermined state in the indexing system, we define the undetermined time zone as UTC, i.e., "2020-02-14T12:00:00"^^xsd:dateTime is equivalent to "2020-02-14T12:00:00Z"^^xsd:dateTime (Z is the UTC timezone, also known as +00:00).

Also note that XSD dates and partial dates, e.g., xsd:gYear values, may have a timezone, which leads to additional complications. E.g., "2020+02:00"^^xsd:gYear (the year 2020 in the +02:00 timezone) will be normalized to 2019-12-31T22:00:00Z (the previous year!) if strict timezone adherence is followed. We have chosen to ignore the timezone on any values that do not have an associated time value, e.g.:

  • "2020-02-15+02:00"^^xsd:date

  • "2020-02+02:00"^^xsd:gYearMonth

  • "2020+02:00"^^xsd:gYear

All of the above will be treated as if they specified UTC as their timezone.

Advanced filtering and fine tuning

entityFilter (string)

The entityFilter parameter is used to fine-tune the set of entities and/or individual values for the configured fields, based on the field value. Entities and field values are synchronized to Kafka if, and only if, they pass the filter. The entity filter is similar to a FILTER() inside a SPARQL query but not exactly the same. Each configured field can be referred to, in the entity filter, by prefixing it with a ?, much like referring to a variable in SPARQL. Several operators are supported:

Operator

Meaning

?var in (value1, value2, ...)

Tests if the field var’s value is one of the specified values. Values are compared strictly unlike the similar SPARQL operator, i.e. for literals to match their datatype must be exactly the same (similar to how SPARQL sameTerm works). Values that do not match, are treated as if they were not present in the repository.

Example:
?status in ("active", "new")

?var not in (value1, value2, ...)

The negated version of the in-operator.

Example:
?status not in ("archived")

bound(?var)

Tests if the field var has a valid value. This can be used to make the field compulsory.

Example:
bound(?name)
?var = value (equal to)
?var != value (not equal to)
?var > value (greater than)
?var >= value (greater than or equal to)
?var < value (less than)
?var <= value (less than or equal to)
RDF value comparison operators that compare RDF values similarly to the equivalent SPARQL operators. The field var’s value will be compared to the specified RDF value. When comparing RDF values that are literals, their datatypes must be compatible, e.g., xsd:integer and xsd:long but not xsd:string and xsd:date. Values that do not match are treated as if they were not present in the repository.
Examples:
Given that height’s value is "150"^^xsd:int and dateOfBirth’s value is "1989-12-31"^^xsd:date, then:
?height = "150"^^xsd:int is TRUE
?height = "150"^^xsd:long is TRUE
?height = "150" is FALSE

?height != "151"^^xsd:int is TRUE
?height != "150" is TRUE

?height > "150"^^xsd:int is FALSE
?height >= "150"^^xsd:int is TRUE
?dateOfBirth < "1990-01-01"^^xsd:date is TRUE

regex(?var, "pattern")

or

regex(?var, "pattern", "i")

Tests if the field var’s value matches the given regular expression pattern.
If the “i” flag option is present, this indicates that the match operates in case-insensitive mode.
Values that do not match are treated as if they were not present in the repository.
Example:
regex(?name, "^mrs?", "i")

expr1 || expr2

or

expr1 or expr2

Logical disjunction of expressions expr1 and expr2.

Examples:
bound(?name) || bound(?company)
bound(?name) or bound(?company)

expr1 && expr2

or

expr1 and expr2

Logical conjunction of expressions expr1 and expr2.

Examples:
bound(?status) && ?status in ("active", "new")
bound(?status) and ?status in ("active", "new")

!expr

Logical negation of expression expr.

Example:
!bound(?company)

( expr )

Grouping of expressions

Example:
(bound(?name) or bound(?company)) && bound(?address)

Note

  • ?var in (...) filters the values of ?var and leaves only the matching values, i.e., it will modify the actual data that will be synchronized to Kafka;

  • bound(?var) checks if there is any valid value left after filtering operators such as ?var in (...) have been applied.

In addition to the operators, there are some constructions that can be used to write filters based not on the values but on values related to them:

Accessing the previous element in the chain

The construction parent(?var) is used for going to a previous level in a property chain. It can be applied recursively as many times as needed, e.g., parent(parent(parent(?var))) goes back in the chain three times. The effective value of parent(?var) can be used with the in or not in operator like this: parent(?company) in (<urn:a>, <urn:b>), or in the bound operator like this: parent(bound(?var)).

Accessing an element beyond the chain

The construction ?var -> uri (alternatively, ?var o uri or just ?var uri) is used to access additional values that are accessible through the property uri. In essence, this construction corresponds to the triple pattern value uri ?effectiveValue, where ?value is a value bound by the field var. The effective value of ?var -> uri can be used with the in or not in operator like this: ?company -> rdf:type in (<urn:c>, <urn:d>). It can be combined with parent() like this: parent(?company) -> rdf:type in (<urn:c>, <urn:d>). The same construction can be applied to the bound operator like this: bound(?company -> <urn:hasBranch>), or even combined with parent() like this: bound(parent(?company) -> <urn:hasGroup>).

The IRI parameter can be a full IRI within < > or the special string rdf:type (alternatively, just type), which will be expanded to http://www.w3.org/1999/02/22-rdf-syntax-ns#type.

Filtering by RDF graph

The construction graph(?var) is used for accessing the RDF graph of a field’s value. The typical use case is to sync only explicit values: graph(?a) not in (<http://www.ontotext.com/implicit>). The construction can be combined with parent() like this: graph(parent(?a)) in (<urn:a>).

Filtering by language tags

The construction lang(?var) is used for accessing the language tag of field’s value (only RDF literals can have a language tag). The typical use case is to sync only values written in a given language: lang(?a) in ("de", "it", "no"). The construction can be combined with parent() and an element beyond the chain like this: lang(parent(?a) -> <http://www.w3.org/2000/01/rdf-schema#label>) in ("en", "bg"). Literal values without language tags can be filtered by using an empty tag: "".

Entity filters and default values

Entity filters can be combined with default values in order to get more flexible behavior.

A typical use case for an entity filter is having soft deletes, i.e., instead of deleting an entity, it is marked as deleted by the presence of a specific value for a given property.

Read-only mode

A read-only connector instance will not update when data has been added to or removed from the repository. The default value is false.

Detect fields

Detects the fields from the data, and only works with import from a graph or a file. The default value is false.

Import from graph

Imports connector data from a virtual graph instead of data from the repository, and requires Read-only. The default value is false.

Basic entity filter example

Given the following RDF data:

@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix example: <http://www.ontotext.com/example#> .

# the entity below will be synchronised because it has a matching value for city: ?city in ("London")
example:alpha
    rdf:type example:gadget ;
    example:name "John Synced" ;
    example:city "London" .

# the entity below will not be synchronised because it lacks the property completely: bound(?city)
example:beta
    rdf:type example:gadget ;
    example:name "Peter Syncfree" .
# the entity below will not be synchronized because it has a different city value:
# ?city in ("London") will remove the value "Liverpool" so bound(?city) will be false
example:gamma
    rdf:type example:gadget ;
    example:name "Mary Syncless" ;
    example:city "Liverpool" .

If you create a connector instance such as:

PREFIX kafka: <http://www.ontotext.com/connectors/kafka#>
PREFIX kafka-inst: <http://www.ontotext.com/connectors/kafka/instance#>

INSERT DATA {
  kafka-inst:my_index kafka:createConnector '''
    {
      "kafkaNode": "localhost:9092",
      "kafkaTopic": "my_index",
      "types": ["http://www.ontotext.com/example#gadget"],
      "fields": [
         {
           "fieldName": "name",
           "propertyChain": ["http://www.ontotext.com/example#name"]
         },
         {
           "fieldName": "city",
           "propertyChain": ["http://www.ontotext.com/example#city"]
         }
      ],
      "entityFilter":"bound(?city) && ?city in (\\"London\\")"
    }
  ''' .
}

The entity :beta is not synchronized as it has no value for city.

To handle such cases, you can modify the connector configuration to specify a default value for city:

...
         {
           "fieldName": "city",
           "propertyChain": ["http://www.ontotext.com/example#city"],
           "defaultValue": "London"
         }
...
}

The default value is used for the entity :beta as it has no value for city in the repository. As the value is “London”, the entity is synchronized.

Advanced entity filter example

Sometimes, data represented in RDF is not well suited to map directly to non-RDF. For example, if you have news articles and they can be tagged with different concepts (locations, persons, events, etc.), one possible way to model this is a single property :taggedWith. Consider the following RDF data:

@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix example2: <http://www.ontotext.com/example2#> .

example2:Berlin
    rdf:type example2:Location ;
    rdfs:label "Berlin" .

example2:Mozart
    rdf:type example2:Person ;
    rdfs:label "Wolfgang Amadeus Mozart" .

example2:Einstein
    rdf:type example2:Person ;
    rdfs:label "Albert Einstein" .

example2:Cannes-FF
    rdf:type example2:Event ;
    rdfs:label "Cannes Film Festival" .

example2:Article1
    rdf:type example2:Article ;
    rdfs:comment "An article about a film about Einstein's life while he was a professor in Berlin." ;
    example2:taggedWith example2:Berlin ;
    example2:taggedWith example2:Einstein ;
    example2:taggedWith example2:Cannes-FF .

example2:Article2
    rdf:type example2:Article ;
    rdfs:comment "An article about Berlin." ;
    example2:taggedWith example2:Berlin .

example2:Article3
    rdf:type example2:Article ;
    rdfs:comment "An article about Mozart's life." ;
    example2:taggedWith example2:Mozart .

example2:Article4
    rdf:type example2:Article ;
    rdfs:comment "An article about classical music in Berlin." ;
    example2:taggedWith example2:Berlin ;
    example2:taggedWith example2:Mozart .

example2:Article5
    rdf:type example2:Article ;
    rdfs:comment "A boring article that has no tags." .

example2:Article6
    rdf:type example2:Article ;
    rdfs:comment "An article about the Cannes Film Festival in 2013." ;
    example2:taggedWith example2:Cannes-FF .

Now, if you map this data to Kafka so that the property :taggedWithx is mapped to separate fields taggedWithPerson and taggedWithLocation according to the type of x (we are not interested in events), you can map taggedWith twice to different fields and then use an entity filter to get the desired values:

PREFIX kafka: <http://www.ontotext.com/connectors/kafka#>
PREFIX kafka-inst: <http://www.ontotext.com/connectors/kafka/instance#>

INSERT DATA {
  kafka-inst:my_index kafka:createConnector '''
    {
      "kafkaNode": "localhost:9092",
      "kafkaTopic": "my_index",
      "types": ["http://www.ontotext.com/example2#Article"],
      "fields": [
         {
            "fieldName": "comment",
            "propertyChain": ["http://www.w3.org/2000/01/rdf-schema#comment"]
         },
         {
           "fieldName": "taggedWithPerson",
           "propertyChain": ["http://www.ontotext.com/example2#taggedWith"]
         },
         {
           "fieldName": "taggedWithLocation",
           "propertyChain": ["http://www.ontotext.com/example2#taggedWith"]
         }
      ],
      "entityFilter": "?taggedWithPerson type in (<http://www.ontotext.com/example2#Person>)
                        && ?taggedWithLocation type in (<http://www.ontotext.com/example2#Location>)"
    }
  ''' .
}

Note

type is the short way to write <http://www.w3.org/1999/02/22-rdf-syntax-ns#type>.

The six articles in the RDF data above will be mapped as such:

Article IRI

Value in taggedWithPerson

Value in taggedWithLocation

Explanation

:Article1

:Einstein

:Berlin

:taggedWith has the values :Einstein, :Berlin and :Cannes-FF. The filter leaves only the correct values in the respective fields. The value :Cannes-FF is ignored as it does not match the filter.

:Article2

:Berlin

:taggedWith has the value :Berlin. After the filter is applied, only taggedWithLocation is populated.

:Article3

:Mozart

:taggedWith has the value :Mozart. After the filter is applied, only taggedWithPerson is populated

:Article4

:Mozart

:Berlin

:taggedWith has the values :Berlin and :Mozart. The filter leaves only the correct values in the respective fields.

:Article5

:taggedWith has no values. The filter is not relevant.

:Article6

:taggedWith has the value :Cannes-FF. The filter removes it as it does not match.

Overview of connector predicates

The following diagram shows a summary of all predicates that can administrate (create, drop, check status) connector instances or issue queries and retrieve results. It can be used as a quick reference of what a particular predicate needs to be attached to. Variables that are bound as a result of a query are shown in green, blank helper nodes are shown in blue, literals in red, and IRIs in orange. The predicates are represented by labeled arrows.

scale 0.85
left to right direction

skinparam activity {
  BackgroundColor<<BNode>> #D1E0FF
  BackgroundColor<<Var>> #D1FFD1
  BackgroundColor<<IRI>> #FFCC80
  BackgroundColor #FFE3E3
}

partition "Instance level" {
  "instance IRI" <<IRI>> -->[:createConnector] "JSON params"
  "instance IRI" -->[:dropConnector] "dummy value"
  "instance IRI" -->[:repairConnector] "dummy value"
  "instance IRI" -->[:connectorStatus] "?status" <<Var>>
}

Caveats

Producer sharing

The Kafka connector aims to minimize resource usage and provide smooth transactional operation. This is achieved by using a single Kafka producer object for each connector instance that is connected to the same Kafka broker node. This has the following benefits:

  • Memory consumption is reduced as each Kafka producer requires a certain amount of buffer memory.

  • A failed transaction in one Kafka connector instance will be reverted in all other Kafka connector instances together with the GraphDB transaction.

Due to the nature of Kafka producers it imposes a restriction as well:

  • All connector instances must use the same Kafka options, e.g., they must have the same values for the bulkUpdateBatchSize and kafkaCompressionType options.

Once you have created at least one Kafka connector instance and attempt to create another instance, the following are possible scenarios:

Different Kafka broker
  • The new connector instance specifies a different Kafka broker.

  • The connector instance will be created and a new Kafka producer will be instantiated.

Same Kafka broker + same Kafka options
  • The new connector instance specifies the same Kafka broker as one of the existing connectors and the SAME options as the existing connector.

  • The connector instance will be created and the existing Kafka producer will be reused

Same Kafka broker + different Kafka options
  • The new connector instance specifies the same Kafka broker as one of the existing connectors and DIFFERENT options than the existing connector.

  • The connector instance will NOT be created and an error explaining the reason will be thrown.

  • See Conflict resolution for possible workarounds.

Note

The Kafka broker for two connector instances is considered to be the same if at least one of the host/port pairs supplied via the kafkaNode option is the same.

Conflict resolution

When the attempt to create a new Kafka connector instance was denied because another instance was already created with different options, there are several possible ways to resolve the conflict:

Manual resolution
  • Examine the options of the new connector instance you want to create.

  • Make the options the same as of the existing connector instance.

Propagate the new options to the existing instances
  • Set the option kafkaPropagateConfig of the new instance to true.

  • The new options will be propagated to all existing instances that share the same Kafka broker node.

Force the allocation of a new producer
  • Set the option kafkaProducerId of the new instance to some non-empty identifier.

  • This will override the producer sharing mechanism and allocate a new producer associating it with the supplied producer ID.

  • The new connector will use the new options.

  • All existing instances will continue using their previous options.