Kafka GraphDB Connector

Note

This feature requires a GraphDB Enterprise license.

Overview and features

The Kafka connector provides a means to synchronize changes to the RDF model to any Kafka consumer, staying automatically up-to-date with the GraphDB repository data.

Note

GraphDB supports full-text search options as well.

The Connectors provide synchronization at the entity level, where an entity is defined as having a unique identifier (an IRI) and a set of properties and property values. In terms of RDF, this corresponds to a set of triples that have the same subject. In addition to simple properties (defined by a single triple), the Connectors support property chains. A property chain is defined as a sequence of triples where each triple’s object is the subject of the following triple.

On the Kafka side, the RDF entities are translated to JSON documents.

The main features of the Kafka Connector are:

  • maintaining a Kafka topic that is always in sync with the data stored in GraphDB;

  • multiple independent instances per repository;

  • the entities for synchronization are defined by:

    • a list of fields (on the Kafka side) and property chains (on the GraphDB side) whose values will be synchronized;

    • a list of rdf:type’s of the entities for synchronization;

    • a list of languages for synchronization (the default is all languages);

    • additional filtering by property and value.

Unlike the Elasticsearch, Solr, and Lucene connectors, the Kafka connector does not have a query interface since Kafka is a simple message queue and does not provide search functionality.

Each feature is described in detail below.

In terms of Kafka terminology and behavior:

  • Each connector instance must be assigned to a fixed Kafka topic.

  • The connector is a Kafka producer, and does not have any information about the Kafka consumers.

  • The partitions are assigned by the Kafka framework and not the connector.

Usage

All interactions with the Kafka GraphDB Connector are done through SPARQL queries.

There are three types of SPARQL queries:

  • INSERT for creating, updating, and deleting connector instances;

  • SELECT for listing connector instances and querying their configuration parameters;

  • INSERT/SELECT for storing and querying data as part of the normal GraphDB data workflow.

In general, this corresponds to INSERT that adds or modifies data, and to SELECT that queries existing data.

Each connector implementation defines its own IRI prefix to distinguish it from other connectors. For the Kafka GraphDB Connector, this is http://www.ontotext.com/connectors/kafka#. Each command or predicate executed by the connector uses this prefix, e.g., http://www.ontotext.com/connectors/kafka#createConnector to create a connector instance for Kafka.

Individual instances of a connector are distinguished by unique names that are also IRIs. They have their own prefix to avoid clashing with any of the command predicates. For Kafka, the instance prefix is http://www.ontotext.com/connectors/kafka/instance#.

Sample data

All examples use the following sample data that describes five fictitious wines: Yoyowine, Franvino, Noirette, Blanquito, and Rozova, as well as the grape varieties required to make these wines. The minimum required ruleset level in GraphDB is RDFS.

@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
@prefix wine: <http://www.ontotext.com/example/wine#> .

wine:RedWine rdfs:subClassOf wine:Wine .
wine:WhiteWine rdfs:subClassOf wine:Wine .
wine:RoseWine rdfs:subClassOf wine:Wine .

wine:Merlo
    rdf:type wine:Grape ;
    rdfs:label "Merlo" .

wine:CabernetSauvignon
    rdf:type wine:Grape ;
    rdfs:label "Cabernet Sauvignon" .

wine:CabernetFranc
    rdf:type wine:Grape ;
    rdfs:label "Cabernet Franc" .

wine:PinotNoir
    rdf:type wine:Grape ;
    rdfs:label "Pinot Noir" .

wine:Chardonnay
    rdf:type wine:Grape ;
    rdfs:label "Chardonnay" .

wine:Yoyowine
    rdf:type wine:RedWine ;
    wine:madeFromGrape wine:CabernetSauvignon ;
    wine:hasSugar "dry" ;
    wine:hasYear "2013"^^xsd:integer .

wine:Franvino
    rdf:type wine:RedWine ;
    wine:madeFromGrape wine:Merlo ;
    wine:madeFromGrape wine:CabernetFranc ;
    wine:hasSugar "dry" ;
    wine:hasYear "2012"^^xsd:integer .

wine:Noirette
    rdf:type wine:RedWine ;
    wine:madeFromGrape wine:PinotNoir ;
    wine:hasSugar "medium" ;
    wine:hasYear "2012"^^xsd:integer .

wine:Blanquito
    rdf:type wine:WhiteWine ;
    wine:madeFromGrape wine:Chardonnay ;
    wine:hasSugar "dry" ;
    wine:hasYear "2012"^^xsd:integer .

wine:Rozova
    rdf:type wine:RoseWine ;
    wine:madeFromGrape wine:PinotNoir ;
    wine:hasSugar "medium" ;
    wine:hasYear "2013"^^xsd:integer .

Setup and maintenance

Prerequisites

Third-party component versions

This version of the Kafka GraphDB Connector uses Kafka version 3.3.1.

Creating a connector instance

Creating a connector instance is done by sending a SPARQL query with the following configuration data:

  • the name of the connector instance (e.g., my_index);

  • a Kafka node and topic to synchronize to;

  • classes to synchronize;

  • properties to synchronize.

The configuration data has to be provided as a JSON string representation and passed together with the create command.

You can create connectors via a Workbench dialog or by using a SPARQL update query (create command).

If you create the connector via the Workbench, no matter which way you use, you will be presented with a pop-up screen showing you the connector creation progress.

Using the Workbench

  1. Go to Setup ‣ Connectors.

  2. Click New Connector in the tab of the respective Connector type you want to create.

  3. Fill out the configuration form.

  4. Execute the CREATE statement from the form by clicking OK. Alternatively, you can view its SPARQL query by clicking View SPARQL Query, and then copy it to execute it manually or integrate it in automation scripts.

Using the create command

The create command is triggered by a SPARQL INSERT with the kafka:createConnector predicate, e.g., it creates a connector instance called my_index, which synchronizes the wines from the sample data above.

To be able to use newlines and quotes without the need for escaping, here we use SPARQL’s multi-line string delimiter consisting of 3 apostrophes: '''...'''. You can also use 3 quotes instead: """...""".

PREFIX kafka: <http://www.ontotext.com/connectors/kafka#>
PREFIX kafka-inst: <http://www.ontotext.com/connectors/kafka/instance#>

INSERT DATA {
    kafka-inst:my_index kafka:createConnector '''
{
  "kafkaNode": "localhost:9092",
  "kafkaTopic": "my_index",
  "types": [
    "http://www.ontotext.com/example/wine#Wine"
  ],
  "fields": [
    {
      "fieldName": "grape",
      "propertyChain": [
        "http://www.ontotext.com/example/wine#madeFromGrape"
      ]
    },
    {
      "fieldName": "sugar",
      "propertyChain": [
        "http://www.ontotext.com/example/wine#hasSugar"
      ]
    },
    {
      "fieldName": "year",
      "propertyChain": [
        "http://www.ontotext.com/example/wine#hasYear"
      ]
    }
  ]
}
''' .
}

The above command creates a new Kafka connector instance that connects to the Kafka instance accessible at port 9200 on the localhost as specified by the kafkaNode key.

The "types" key defines the RDF type of the entities to synchronize and, in the example, it is only entities of the type http://www.ontotext.com/example/wine#Wine (and its subtypes if RDFS or higher-level reasoning is enabled). The "fields" key defines the mapping from RDF to Kafka. The basic building block is the property chain, i.e., a sequence of RDF properties where the object of each property is the subject of the following property. In the example, three bits of information are mapped - the grape the wines are made of, sugar content, and year. Each chain is assigned a short and convenient field name: “grape”, “sugar”, and “year”. The field names are later used in the queries.

The field grape is an example of a property chain composed of more than one property. First, we take the wine’s madeFromGrape property, the object of which is an instance of the type Grape, and then we take the rdfs:label of this instance. The fields sugar and year are both composed of a single property that links the value directly to the wine.

Working with a secured Kafka broker

GraphDB can connect to a secured Kafka broker using the SASL/PLAIN authentication mechanism. To configure it, set the kafkaPlainAuthUsername and kafkaPlainAuthPassword parameters. Since the password will be transmitted in clear text, it is recommended to enable SSL on the Kafka broker, and accordingly set the kafkaSSL parameter to true.

Instead of supplying the username and password as part of the connector instance configuration, you can also implement a custom authenticator class and set it via the authenticationConfiguratorClass option. See these connector authenticator examples for more information and example projects that implement such a custom class.

There is no explicitly configurable support for other authentication mechanism supported Kafka. It should be possible to configure most of them by supplying the relevant Kafka producer properties via the kafkaProducerConfig parameter.

Dropping a connector instance

Dropping a connector instance removes all references to its external store from GraphDB as well as the Kafka index associated with it.

The drop command is triggered by a SPARQL INSERT with the dropConnector predicate where the name of the connector instance has to be in the subject position, e.g., this removes the connector my_index:

PREFIX kafka: <http://www.ontotext.com/connectors/kafka#>
PREFIX kafka-inst: <http://www.ontotext.com/connectors/kafka/instance#>

INSERT DATA {
  kafka-inst:my_index kafka:dropConnector [] .
}

You can also force drop a connector in case a normal delete does not work. The force delete will remove the connector even if part of the operation fails. Go to Setup ‣ Connectors where you will see the already existing connectors that you have created. Click the delete icon, and check Force delete in the dialog box.

_images/connectors-force-delete.png

Retrieving the create options for a connector instance

You can view the options string that was used to create a particular connector instance with the following query:

PREFIX kafka: <http://www.ontotext.com/connectors/kafka#>
PREFIX kafka-inst: <http://www.ontotext.com/connectors/kafka/instance#>

SELECT ?createString {
  kafka-inst:my_index kafka:listOptionValues ?createString .
}

Listing available connector instances

In the Connectors management view

Existing Connector instances shown below the New Connector button. Click the name of an instance to view its configuration and SPARQL query, or click the repair / delete icons to perform these operations. Click the copy icon to copy the connector definition query to your clipboard.

With a SPARQL query

Listing connector instances returns all previously created instances. It is a SELECT query with the listConnectors predicate:

PREFIX kafka: <http://www.ontotext.com/connectors/kafka#>

SELECT ?cntUri ?cntStr {
  ?cntUri kafka:listConnectors ?cntStr .
}

?cntUri is bound to the prefixed IRI of the connector instance that was used during creation, e.g., http://www.ontotext.com/connectors/kafka/instance#my_index, while ?cntStr is bound to a string, representing the part after the prefix, e.g., "my_index".

Instance status check

The internal state of each connector instance can be queried using a SELECT query and the connectorStatus predicate:

PREFIX kafka: <http://www.ontotext.com/connectors/kafka#>

SELECT ?cntUri ?cntStatus {
  ?cntUri kafka:connectorStatus ?cntStatus .
}

?cntUri is bound to the prefixed IRI of the connector instance, while ?cntStatus is bound to a string representation of the status of the connector represented by this IRI. The status is key-value based.

Working with data

Adding, updating, and deleting data

From the user point of view, all synchronization happens transparently without using any additional predicates or naming a specific store explicitly, i.e., you must simply execute standard SPARQL INSERT/DELETE queries. This is achieved by intercepting all changes in the plugin and determining which Kafka documents need to be updated.

List of creation parameters

The creation parameters define how a connector instance is created by the kafka:createConnector predicate. Some are required and some are optional. All parameters are provided together in a JSON object, where the parameter names are the object keys. Parameter values may be simple JSON values such as a string or a boolean, or they can be lists or objects.

All of the creation parameters can also be set conveniently from the Create Connector user interface without any knowledge of JSON.

readonly (boolean), optional, read-only mode

A read-only connector will index all existing data in the repository at creation time, but, unlike non-read-only connectors, it will:

  • Not react to updates. Changes will not be synced to the connector.

  • Not keep any extra structures (such as the internal Lucene index for tracking updates to chains)

The only way to index changes in data after the connector has been created is to repair (or drop/recreate) the connector.

importGraph (boolean), optional, specifies that the RDF data from which to create the connector is in a special virtual graph

Used to make a Kafka index from temporary RDF data inserted in the same transaction. It requires read-only mode and creates a connector whose data will come from statements inserted into a special virtual graph instead of data contained in the repository. The virtual graph is kafka:graph, where the prefix kafka: is as defined before. The data have to be inserted into this graph before the connector create statement is executed.

Both the insertion into the special graph and the create statement must be in the same transaction. In GDB Workbench, this can be done by pasting them one after another in the SPARQL editor and putting a semicolon at the end of the first INSERT. This functionality requires read-only mode.

PREFIX kafka: <http://www.ontotext.com/connectors/kafka#>
INSERT {
    GRAPH kafka:graph {
        ...
    }
} WHERE {
        ...
};
PREFIX kafka: <http://www.ontotext.com/connectors/kafka#>
PREFIX kafka-inst: <http://www.ontotext.com/connectors/kafka/instance#>
INSERT DATA {
    kafka-inst:my_index kafka:createConnector '''
{
  "readonly": true,
  "importGraph": true,
  "fields": [],
  "languages": [],
  "types": [],
}
''' .
}
importFile (string), optional, an RDF file with data from which to create the connector

Creates a connector whose data will come from an RDF file on the file system instead of data contained in the repository. The value must be the full path to the RDF file. This functionality requires readonly mode.

detectFields (boolean), optional, detects fields

This mode introduces automatic field detection when creating a connector. You can omit specifying fields in JSON. Instead, you will get automatic fields: each corresponds to a single predicate, and its field name is the same as the predicate (so you need to use escaping when issuing Kafka queries).

In this mode, specifying types is optional too. If types are not provided, then all types will be indexed. This mode requires importGraph or importFile.

Once the connector is created, you can inspect the detected fields in the Connector management section of the Workbench.

kafkaNode (string), required, the Kafka instance to sync to

As Kafka is a third-party service, you have to specify the node where it is running. The format of the node value is of the form http://hostname.domain:port, https:// is allowed too. No default value. Can be updated at runtime without having to rebuild the index.

kafkaTopic (string), required, the Kafka topic to send documents to.

No default value.

kafkaSSL (boolean), optional, controls whether to use an SSL connection to the Kafka broker.

False by default. Can be updated at runtime without having to rebuild the index.

kafkaPlainAuthUsername (string), optional, supplies the username for Kafka SASL PLAIN authentication.

No default value. Can be updated at runtime without having to rebuild the index.

kafkaPlainAuthPassword (string), optional, supplies the password for Kafka SASL PLAIN authentication.

No default value. Can be updated at runtime without having to rebuild the index.

bulkUpdateBatchSize (integer), controls the maximum batch size in bytes and corresponds to Kafka producer config batch.size.

Default value is 1,048,576 (1 megabyte). Can be updated at runtime without having to rebuild the index.

bulkUpdateRequestSize (integer), controls the maximum request size (and consequently the maximum size per document) in bytes.

Default value is 1,048,576 (1 megabyte). Can be updated at runtime without having to rebuild the index.

authenticationConfiguratorClass optional, provides custom authentication behavior

kafkaCompressionType (string), sets the compression to use when sending documents to Kafka.

One of none, gzip, lz4, snappy), the default is snappy. This corresponds to Kafka producer config property compression.type. Can be updated at runtime without having to rebuild the index.

kafkaProducerId (string), an optional identifier that allows for separate Kafka producers with different options to the same Kafka broker.

No default – all instances to the same Kafka broker will use a shared Kafka producer and thus must have the same options. See also Producer sharing and Conflict resolution.

kafkaProducerConfig (JSON), optional, the settings for creating the Kafka producer.

This option is passed directly to the Kafka producer when it is instantiated. Each key is a Kafka producer configuration property. Some config keys, e.g., transactional.id, are not allowed here. No default. Can be updated at runtime without having to rebuild the index.

kafkaIgnoreDeleteAll (boolean), optional, a flag that, when selected, will not notify Kafka when all repository statements are removed.

GraphDB handles the removal of all statements as a special operation that is manifested as sending a Kafka record with NULL key and NULL value. If this flag is true, no such record will be sent. False by default.

kafkaPropagateConfig (boolean), optional, a non-persisted flag that, when selected, will force propagating the Kafka config to other connector instances using the same Kafka broker.

False by default. See also Producer sharing and Conflict resolution. Can be updated at runtime without having to rebuild the index.

types (list of IRIs), required, specifies the types of entities to sync

The RDF types of entities to sync are specified as a list of IRIs. At least one type IRI is required.

Use the pseudo-IRI $any to sync entities that have at least one RDF type.

Use the pseudo-IRI $untyped to sync entities regardless of whether they have any RDF type.

languages (list of strings), optional, valid languages for literals

RDF data is often multilingual, but only some of the languages represented in the literal values can be mapped. This can be done by specifying a list of language ranges to be matched to the language tags of literals according to RFC 4647, Section 3.3.1. Basic Filtering. In addition, an empty range can be used to include literals that have no language tag. The list of language ranges maps all existing literals that have matching language tags.

fields (list of field objects), required, defines the mapping from RDF to Kafka

The fields specify exactly which parts of each entity will be synchronized as well as the specific details on the connector side. The field is the smallest synchronization unit and it maps a property chain from GraphDB to a field in Kafka. The fields are specified as a list of field objects. At least one field object is required. Each field object has further keys that specify details.

  • fieldName (string), required, the name of the field in Kafka

    The name of the field defines the mapping on the connector side. It is specified by the key fieldName with a string value. The field name is used as the key in the JSON document that will be sent to Kafka.

  • fieldNameTransform (one of none, predicate, or predicate.localName), optional, none by default

    Defines an optional transformation of the field name. Although fieldName is always required, it is ignored if fieldNameTransform is predicate or predicate.localName.

    • none: The field name is supplied via the fieldName option.

    • predicate: The field name is equal to the full IRI of the last predicate of the chain, e.g., if the last predicate was http://www.w3.org/2000/01/rdf-schema#label, then the field name will be http://www.w3.org/2000/01/rdf-schema#label too.

    • predicate.localName: The field name is the derived from the local name of the IRI of the last predicate of the chain, e.g., if the last predicate was http://www.w3.org/2000/01/rdf-schema#comment, then the field name will be comment.

    See Indexing all literals in distinct fields for an example.

  • propertyChain (list of IRIs), required, defines the property chain to reach the value

    The property chain defines the mapping on the GraphDB side. A property chain is defined as a sequence of triples where the entity IRI is the subject of the first triple, its object is the subject of the next triple, etc. In this model, a property chain with a single element corresponds to a direct property defined by a single triple. Property chains are specified as a list of IRIs where at least one IRI must be provided.

    The IRI of the document will be synchronized as the key in the Kafka record.

    See Copy fields for defining multiple fields with the same property chain.

    See Multiple property chains per field for defining a field whose values are populated from more than one property chain.

    See Indexing language tags for defining a field whose values are populated with the language tags of literals.

    See Indexing the IRI of an entity for defining a field whose values are populated with the IRI of the indexed entity.

    See Wildcard literal indexing for defining a field whose values are populated with literals regardless of their predicate.

  • valueFilter (string), optional, specifies the value filter for the field

    See also Entity filtering.

  • documentFilter (string), optional, specifies the nested document filter for the field

    Only for fields that define nested documents). See also Entity filtering.

  • defaultValue (string), optional, specifies a default value for the field

    The default value (defaultValue) provides means for specifying a default value for the field when the property chain has no matching values in GraphDB. The default value can be a plain literal, a literal with a datatype (xsd: prefix supported), a literal with language, or a IRI. It has no default value.

  • indexed (boolean), optional, default true

    If indexed, a field will be included in the JSON document sent to Kafka. True by default.

    If true, this option corresponds to "index" = true. If false, it corresponds to "index" = false.

  • multivalued (boolean), optional, default true

    RDF properties and synchronized fields may have more than one value. If multivalued is set to true, all values will be synchronized to Kafka. If set to false, only a single value will be synchronized. True by default.

  • ignoreInvalidValues (boolean), optional, default false

    Per-field option that controls what happens when a value cannot be converted to the requested (or previously detected) type. False by default.

    Example use: when an invalid date literal like "2021-02-29"^^xsd:date (2021 is not a leap year) needs to be indexed as a date, or when an IRI needs to be indexed as a number.

    Note that some conversions are always valid, for example a literal or an IRI to a string field. When true, such values will be skipped with a note in the logs. When false, such values will break the transaction.

  • array (boolean), optional, default false

    Normally, Kafka creates an array only if more than value is present for a given field. If array is set to true, Kafka will always create an array even for single values. If set to false, Kafka will create arrays for multiple values only. False by default.

  • datatype (string), optional, the manual datatype override

    By default, the Kafka GraphDB Connector uses datatype of literal values to determine how they should be mapped to Kafka types. For more information on the supported datatypes, see Datatype mapping.

    The mapping can be overridden through the property "datatype", which can be specified per field. The value of datatype can be any of the xsd: types supported by the automatic mapping or a native Kafka type prefixed by native:, e.g., both xsd:long and native:long map to the long type in Kafka.

  • objectFields (objects array), optional, nested object mapping

    When native:object is used as a datatype value, provide a mapping for the nested object’s fields. If datatype is not provided, then native:object will be assumed.

    Nested objects support further nested objects with a limit of five levels of nesting.

  • startFromParent (integer), optional, default 0

    Start processing the property chain from the N-th parent instead of the root of the current nested object. 0 is the root of the current nested object, 1 is the parent of the nested object, 2 is the parent of the parent and so on.

valueFilter (string), optional, specifies the top-level value filter for the document

See also Entity filtering.

documentFilter (string), optional, specifies the top-level document filter for the document

See also Entity filtering.

Updating parameters at runtime

As mentioned above, the following connector parameters can be updated at runtime without having to rebuild the index:

  • kafkaNode

  • kafkaSSL

  • kafkaProducerConfig

  • kafkaCompressionType

  • kafkaPlainAuthUsername

  • kafkaPlainAuthPassword

  • bulkUpdateBatchSize

  • bulkUpdateRequestSize

  • kafkaPropagateConfig

This can be done by executing the following SPARQL update, here with examples for changing the user and password:

PREFIX kafka: <http://www.ontotext.com/connectors/kafka#>
PREFIX kafka-inst: <http://www.ontotext.com/connectors/kafka/instance#>
INSERT DATA {
kafka-inst:my_index kafka:updateConnector '''
  {
        "kafkaPlainAuthUsername": "foo"
        "kafkaPlainAuthPassword": "bar"
  }
''' .
}

Special field definitions

Nested objects

Nested objects are JSON objects that are used as values in the main document or other nested objects (up to five levels of nesting is possible). They are defined with the objectFields option.

Having the following data consisting of children and grandchildren relations:

<urn:John>
    a <urn:Person> ;
    <urn:name> "John" ;
    <urn:gender> <urn:Male> ;
    <urn:age> 60 ;
    <urn:hasSpouse> <urn:Mary> ;
    <urn:hasChild> <urn:Billy> ;
    <urn:hasChild> <urn:Annie> .

<urn:Mary>
    a <urn:Person> ;
    <urn:name> "Mary" ;
    <urn:gender> <urn:Female> ;
    <urn:age> 58 ;
    <urn:hasSpouse> <urn:John> ;
    <urn:hasChild> <urn:Billy> .

<urn:Eva>
    a <urn:Person> ;
    <urn:name> "Eva" ;
    <urn:gender> <urn:Female> ;
    <urn:age> 45 ;
    <urn:hasChild> <urn:Annie> .

<urn:Billy>
    a <urn:Person> ;
    <urn:name> "Billy" ;
    <urn:gender> <urn:Male> ;
    <urn:age> 35 ;
    <urn:hasChild> <urn:Tylor> ;
    <urn:hasChild> <urn:Melody> .

<urn:Annie>
    a <urn:Person> ;
    <urn:name> "Annie" ;
    <urn:gender> <urn:Female> ;
    <urn:age> 28 ;
    <urn:hasChild> <urn:Sammy> .

<urn:Tylor>
    a <urn:Person> ;
    <urn:name> "Tylor" ;
    <urn:gender> <urn:Male> ;
    <urn:age> 5 .

<urn:Melody>
    a <urn:Person> ;
    <urn:name> "Melody" ;
    <urn:gender> <urn:Female> ;
    <urn:age> 2 .

<urn:Sammy>
    a <urn:Person> ;
    <urn:name> "Sammy" ;
    <urn:gender> <urn:Male> ;
    <urn:age> 10 .

<urn:Male> <urn:label> "male" .

<urn:Female> <urn:label> "female" .

We can create a nested objects index that consists of children and grandchildren with their corresponding fields defining their gender and age:

{
  "fields": [
    {
      "fieldName": "name",
      "propertyChain": [
        "urn:name"
      ]
    },
    {
      "fieldName": "age",
      "propertyChain": [
        "urn:age"
      ],
      "datatype": "xsd:long"
    },
    {
      "fieldName": "hasSpouse",
      "propertyChain": [
        "urn:hasSpouse"
      ]
    },
    {
      "fieldName": "gender",
      "propertyChain": [
        "urn:gender",
        "urn:label"
      ]
    },
    {
      "fieldName": "children",
      "propertyChain": [
        "urn:hasChild"
      ],
      "datatype": "native:object",
      "objectFields": [
        {
          "fieldName": "id",
          "propertyChain": [
            "$self"
          ]
        },
        {
          "fieldName": "name",
          "propertyChain": [
            "urn:name"
          ]
        },
        {
          "fieldName": "age",
          "propertyChain": [
            "urn:age"
          ],
          "datatype": "xsd:long"
        },
        {
          "fieldName": "gender",
          "propertyChain": [
            "urn:gender",
            "urn:label"
          ]
        },
        {
          "fieldName": "children",
          "propertyChain": [
            "urn:hasChild"
          ],
          "objectFields": [
            {
              "fieldName": "id",
              "propertyChain": [
                "$self"
              ]
            },
            {
              "fieldName": "name",
              "propertyChain": [
                "urn:name"
              ]
            },
            {
              "fieldName": "age",
              "propertyChain": [
                "urn:age"
              ],
              "datatype": "xsd:long"
            }
          ]
        }
      ]
    },
    {
      "fieldName": "grandChildren",
      "valueFilter": "$this -> type in (<urn:Person>)",
      "propertyChain": [
        "urn:hasChild",
        "urn:hasChild"
      ],
      "datatype": "native:object",
      "objectFields": [
        {
          "fieldName": "id",
          "propertyChain": [
            "$self"
          ]
        },
        {
          "fieldName": "name",
          "propertyChain": [
            "urn:name"
          ]
        },
        {
          "fieldName": "age",
          "propertyChain": [
            "urn:age"
          ],
          "datatype": "xsd:long"
        },
        {
          "fieldName": "gender",
          "propertyChain": [
            "urn:gender",
            "urn:label"
          ]
        }
      ]
    },
  ],
  "types": [
    "urn:Person"
  ],
  "kafkaNode": ...,
  "kafkaTopic": ...
}

Copy fields

Often, it is convenient to synchronize one and the same data multiple times with different settings to accommodate for different use cases. The Kafka GraphDB Connector has explicit support for fields that copy their value from another field. This is achieved by specifying a single element in the property chain of the form @otherFieldName, where otherFieldName is another non-copy field. Take the following example:

...
  "fields": [
    {
      "fieldName": "grape",
      "facet": false,
      "propertyChain": [
        "http://www.ontotext.com/example/wine#madeFromGrape",
        "http://www.w3.org/2000/01/rdf-schema#label"
      ]
    },
    {
      "fieldName": "whiteGrape",
      "propertyChain": [
        "@grape"
      ]
    }
  ],
  "entityFilter": "?whiteGrape -> type = <wine:WhiteGrape>"
...

The snippet creates a field “grape” containing all grapes, and another field “whiteGrape”. Both fields are populated with the same values initially and “whiteGrape” is defined as a copy field that refers to the field “grape”. The field “whiteGrape” is additionally filtered so that only certain grape varieties will be synchronized.

Note

The connector handles copy fields in a more optimal way than specifying a field with exactly the same property chain as another field.

Multiple property chains per field

Sometimes, you have to work with data models that define the same concept (in terms of what you want to index in Kafka) with more than one property chain, e.g., the concept of “name” could be defined as a single canonical name, multiple historical names and some unofficial names. If you want to index these together as a single field in Kafka, you can define this as a multiple property chains field.

Fields with multiple property chains are defined as a set of separate virtual fields that will be merged into a single physical field when indexed. Virtual fields are distinguished by the suffix $xyz, where xyz is any alphanumeric sequence of convenience.For example, we can define the fields name$1 and name$2 like this:

...
  "fields": [
    {
      "fieldName": "name$1",
      "propertyChain": [
        "http://www.ontotext.com/example#canonicalName"
      ],
      "fieldName": "name$2",
      "propertyChain": [
        "http://www.ontotext.com/example#historicalName"
      ]
      ...
    },
...

The values of the fields name$1 and name$2 will be merged and synchronized to the field name in Kafka.

Note

You cannot mix suffixed and unsuffixed fields with the same same, e.g., if you defined myField$new and myField$old, you cannot have a field called just myField.

Filters and fields with multiple property chains

Filters can be used with fields defined with multiple property chains. Both the physical field values and the individual virtual field values are available:

  • Physical fields are specified without the suffix, e.g., ?myField

  • Virtual fields are specified with the suffix, e.g., ?myField$2 or ?myField$alt.

Note

Physical fields cannot be combined with parent() as their values come from different property chains. If you really need to filter the same parent level, you can rewrite parent(?myField) in (<urn:x>, <urn:y>) as parent(?myField$1) in (<urn:x>, <urn:y>) || parent(?myField$2) in (<urn:x>, <urn:y>) || parent(?myField$3) ... and surround it with parentheses if it is a part of a bigger expression.

Indexing language tags

The language tag of an RDF literal can be indexed by specifying a property chain, where the last element is the pseudo-IRI lang(). The property preceding lang() must lead to a literal value. For example:

PREFIX kafka: <http://www.ontotext.com/connectors/kafka#>
PREFIX kafka-inst: <http://www.ontotext.com/connectors/kafka/instance#>

INSERT DATA {
  kafka-inst:my_index kafka:createConnector '''
    {
      "kafkaNode": "localhost:9092",
      "kafkaTopic": "my_index",
      "types": ["http://www.ontotext.com/example#gadget"],
      "fields": [
         {
           "fieldName": "name",
           "propertyChain": [
             "http://www.ontotext.com/example#name"
           ]
         },
         {
           "fieldName": "nameLanguage",
           "propertyChain": [
             "http://www.ontotext.com/example#name",
             "lang()"
           ]
         }
      ],
    }
  ''' .
}

The above connector will index the language tag of each literal value of the property http://www.ontotext.com/example#name into the field nameLanguage.

Indexing named graphs

The named graph of a given value can be indexed by ending a property chain with the special pseudo-URI graph(). Indexing the named graph of the value instead of the value itself allows searching by named graph.

PREFIX kafka: <http://www.ontotext.com/connectors/kafka#>
PREFIX kafka-inst: <http://www.ontotext.com/connectors/kafka/instance#>

INSERT DATA {
  kafka-inst:my_index kafka:createConnector '''
    {
      "kafkaNode": "localhost:9092",
      "kafkaTopic": "my_index",
      "types": ["http://www.ontotext.com/example#gadget"],
      "fields": [
         {
           "fieldName": "name",
           "propertyChain": [
             "http://www.ontotext.com/example#name"
           ]
         },
         {
           "fieldName": "nameGraph",
           "propertyChain": [
             "http://www.ontotext.com/example#name",
             "graph()"
           ]
         }
      ],
    }
  ''' .
}

The above connector will index the named graph of each value of the property http://www.ontotext.com/example#name into the field nameGraph.

Wildcard literal indexing

In this mode, the last element of a property chain is a wildcard that will match any predicate that leads to a literal value. Use the special pseudo-IRI $literal as the last element of the property chain to activate it.

Note

Currently, it really means any literal, including literals with data types.

For example:

{
    "fields" : [ {
        "propertyChain" : [ "$literal" ],
        "fieldName" : "name"
    }, {
        "propertyChain" : [ "http://example.com/description", "$literal" ],
        "fieldName" : "description"
    }
    ...
}

See Indexing all literals for a detailed example.

Indexing the IRI of an entity

Sometimes you may need the IRI of each entity (e.g., http://www.ontotext.com/example/wine#Franvino from our small example dataset) indexed as a regular field. This can be achieved by specifying a property chain with a single property referring to the pseudo-IRI $self. For example:

PREFIX kafka: <http://www.ontotext.com/connectors/kafka#>
PREFIX kafka-inst: <http://www.ontotext.com/connectors/kafka/instance#>

INSERT DATA {
    kafka-inst:my_index kafka:createConnector '''
{
  "kafkaNode": "localhost:9092",
  "kafkaTopic": "my_index",
  "types": [
    "http://www.ontotext.com/example/wine#Wine"
  ],
  "fields": [
    {
      "fieldName": "entityId",
      "propertyChain": [
        "$self"
      ],
    },
    {
      "fieldName": "grape",
      "propertyChain": [
        "http://www.ontotext.com/example/wine#madeFromGrape",
        "http://www.w3.org/2000/01/rdf-schema#label"
      ]
    },
  ]
}
''' .
}

The above connector will index the IRI of each wine into the field entityId.

Note

Note that GraphDB will also use the IRI of each entity as the ID of each document in Kafka, which is represented by the field id.

Datatype mapping

The Kafka GraphDB Connector maps different types of RDF values to different types of Kafka values according to the basic type of the RDF value (IRI or literal) and the datatype of literals. The auto-detection uses the following mapping:

RDF value

RDF datatype

JSON type

IRI

n/a

string

literal

any type not explicitly mentioned below

string

literal

xsd:boolean

boolean

literal

xsd:double

number

literal

xsd:float

number

literal

xsd:long

number

literal

xsd:int

number

literal

xsd:dateTime

string in ISO format with time zone

literal

xsd:date

string in ISO format without time zone

literal

xsd:time

string in ISO format with time zone

literal

xsd:gYear

string in ISO format without time zone

literal

xsd:gYearMonth

string in ISO format without time zone

Note

For any given field, the automatic mapping uses the first value it sees. This works fine for clean datasets but might lead to problems, if your dataset has non-normalized data, e.g., the first value has no datatype but other values have.

It is therefore recommended to set datatype to a fixed value, e.g., xsd:date.

Please note that the commonly used xsd:integer and xsd:decimal datatypes are not indexed as numbers because they represent infinite precision numbers. You can override that by using the datatype option to cast to xsd:long, xsd:double, xsd:float as appropriate.

Date and time conversion

RDF and ISO use slightly different models for representing dates and times, even though the values might look very similar.

Years in RDF values use the XSD format and are era years, where positive values denote the common era and negative values denote years before the common era. There is no year zero.

Years in the ISO format are proleptic years, i.e., positive values denote years from the common era with any previous eras just going down by one mathematically so there is year zero.

In short:

  • year 2020 CE = year 2020 in XSD = year 2020 in ISO.

  • year 1 CE = year 1 in XSD = year 1 in ISO.

  • year 1 BCE = year -1 in XSD = year 0 in ISO.

  • year 2 BCE = year -2 in XSD = year -1 in ISO.

All years coming from RDF literals will be converted to ISO before sending to Kafka.

Both XSD and ISO date and time values support timezones. In addition to that, XSD defines the lack of a timezone as undetermined. Since we do not want to have any undetermined state in the indexing system, we define the undetermined time zone as UTC, i.e., "2020-02-14T12:00:00"^^xsd:dateTime is equivalent to "2020-02-14T12:00:00Z"^^xsd:dateTime (Z is the UTC timezone, also known as +00:00).

Also note that XSD dates and partial dates, e.g., xsd:gYear values, may have a timezone, which leads to additional complications. E.g., "2020+02:00"^^xsd:gYear (the year 2020 in the +02:00 timezone) will be normalized to 2019-12-31T22:00:00Z (the previous year!) if strict timezone adherence is followed. We have chosen to ignore the timezone on any values that do not have an associated time value, e.g.:

  • "2020-02-15+02:00"^^xsd:date

  • "2020-02+02:00"^^xsd:gYearMonth

  • "2020+02:00"^^xsd:gYear

All of the above will be treated as if they specified UTC as their timezone.

Entity filtering

The Kafka connector supports four kinds of entity filters used to fine-tune the set of entities and/or individual values for the configured fields, based on the field value. Entities and field values are synchronized to Kafka if, and only if, they pass the filter. The filters are similar to a FILTER() inside a SPARQL query but not exactly the same. In them, each configured field can be referred to by prefixing it with a ?, much like referring to a variable in SPARQL.

Types of filters

Top-level value filter

The top-level value filter is specified via valueFilter. It is evaluated prior to anything else when only the document ID is known and it may not refer to any field names but only to the special field $this that contains the current document ID. Failing to pass this filter removes the entire document early in the indexing process and it can be used to introduce more restrictions similar to the built-in filtering by type via the types property.

Top-level document filter

The top-level document filter is specified via documentFilter. This filter is evaluated last when all of the document has been collected and it decides whether to include the document in the index. It can be used to enforce global document restrictions, e.g., certain fields are required or a document needs to be indexed only if a certain field value meets specific conditions.

Per-field value filter

The per-field value filter is specified via valueFilter inside the field definition of the field whose values are to be filtered. The filter is evaluated while collecting the data for the field when each field value becomes available.

The variable that contains the field value is $this. Other field names can be used to filter the current field’s value based on the value of another field, e.g., $this > ?age will compare the current field value to the value of the field age (see also Two-variable filtering). Failing to pass the filter will remove the current field value.

On nested documents, the per-field value filter can be used to remove the entire nested document early in the indexing process, e.g., by checking the type of the nested document via next hop with rdf:type.

Nested document filter

The nested document filter is specified via documentFilter inside the field definition of the field that defines the root of a nested document. The filter is evaluated after the entire nested document has been collected. Failing to pass this filter removes the entire nested document.

Inside a nested document filter, the field names are within the context of the nested document and not within the context of the top-level document. For example, if we have a field children that defines a nested document, and we use a filter like ?age < "10"^^xsd:int, we will be referring to the field children.age. We can use the prefix $outer. one or more times to refer to field values from the outer document (from the viewpoint of the nested document). For example, $outer.age > "25"^^xsd:int will refer to the age field that is a sibling of the children field.

Other than the above differences, the nested document filter is equivalent to the top-level document filter from the viewpoint of the nested document.

See also Migrating from GraphDB 9.x.

Filter operators

The filter operators are used to test if the value of a given field satisfies a certain condition.

Field comparisons are done on original RDF values before they are converted to Kafka values using datatype mapping.

Operator

Meaning

?var in (value1, value2, ...)

Tests if the field var’s value is one of the specified values. Values are compared strictly unlike the similar SPARQL operator, i.e. for literals to match their datatype must be exactly the same (similar to how SPARQL sameTerm works). Values that do not match, are treated as if they were not present in the repository.

Example:
?status in ("active", "new")

?var not in (value1, value2, ...)

The negated version of the in-operator.

Example:
?status not in ("archived")

bound(?var)

Tests if the field var has a valid value. This can be used to make the field compulsory.

Example:
bound(?name)

isExplicit(?var)

Tests if the field var’s value came from an explicit statement. This will use the last element of the property chain. If you need to assert the explicit status of a previous property chain use parent(?var) as many times as needed.

Example:
isExplicit(?name)
?var = value (equal to)
?var != value (not equal to)
?var > value (greater than)
?var >= value (greater than or equal to)
?var < value (less than)
?var <= value (less than or equal to)
RDF value comparison operators that compare RDF values similarly to the equivalent SPARQL operators. The field var’s value will be compared to the specified RDF value. When comparing RDF values that are literals, their datatypes must be compatible, e.g., xsd:integer and xsd:long but not xsd:string and xsd:date. Values that do not match are treated as if they were not present in the repository.
Examples:
Given that height’s value is "150"^^xsd:int and dateOfBirth’s value is "1989-12-31"^^xsd:date, then:
?height = "150"^^xsd:int is true
?height = "150"^^xsd:long is true
?height = "150" is false

?height != "151"^^xsd:int is true
?height != "150" is true

?height > "150"^^xsd:int is false
?height >= "150"^^xsd:int is true
?dateOfBirth < "1990-01-01"^^xsd:date is true

regex(?var, "pattern")

or

regex(?var, "pattern", "i")

Tests if the field var’s value matches the given regular expression pattern.
If the “i” flag option is present, this indicates that the match operates in case-insensitive mode.
Values that do not match are treated as if they were not present in the repository.
Example:
regex(?name, "^mrs?", "i")

expr1 || expr2

or

expr1 or expr2

Logical disjunction of expressions expr1 and expr2.

Examples:
bound(?name) || bound(?company)
bound(?name) or bound(?company)

expr1 && expr2

or

expr1 and expr2

Logical conjunction of expressions expr1 and expr2.

Examples:
bound(?status) && ?status in ("active", "new")
bound(?status) and ?status in ("active", "new")

!expr

Logical negation of expression expr.

Example:
!bound(?company)

( expr )

Grouping of expressions

Example:
(bound(?name) or bound(?company)) && bound(?address)

Filter modifiers

In addition to the operators, there are some constructions that can be used to write filters based not on the values of a field but on values related to them:

Accessing the previous element in the chain

The construction parent(?var) is used for going to a previous level in a property chain. It can be applied recursively as many times as needed, e.g., parent(parent(parent(?var))) goes back in the chain three times. The effective value of parent(?var) can be used with the in or not in operator like this: parent(?company) in (<urn:a>, <urn:b>), or in the bound operator like this: parent(bound(?var)).

Accessing an element beyond the chain

The construction ?var -> uri (alternatively, ?var o uri or just ?var uri) is used to access additional values that are accessible through the property uri. In essence, this construction corresponds to the triple pattern value uri ?effectiveValue, where ?value is a value bound by the field var. The effective value of ?var -> uri can be used with the in or not in operator like this: ?company -> rdf:type in (<urn:c>, <urn:d>). It can be combined with parent() like this: parent(?company) -> rdf:type in (<urn:c>, <urn:d>). The same construction can be applied to the bound operator like this: bound(?company -> <urn:hasBranch>), or even combined with parent() like this: bound(parent(?company) -> <urn:hasGroup>).

The IRI parameter can be a full IRI within < > or the special string rdf:type (alternatively, just type), which will be expanded to http://www.w3.org/1999/02/22-rdf-syntax-ns#type.

Filtering by RDF graph

The construction graph(?var) is used for accessing the RDF graph of a field’s value. A typical use case is to sync only explicit values: graph(?a) not in (<http://www.ontotext.com/implicit>) but using isExplicit(?a) is the recommended way.

The construction can be combined with parent() like this: graph(parent(?a)) in (<urn:a>).

Filtering by language tags

The construction lang(?var) is used for accessing the language tag of field’s value (only RDF literals can have a language tag). The typical use case is to sync only values written in a given language: lang(?a) in ("de", "it", "no"). The construction can be combined with parent() and an element beyond the chain like this: lang(parent(?a) -> <http://www.w3.org/2000/01/rdf-schema#label>) in ("en", "bg"). Literal values without language tags can be filtered by using an empty tag: "".

Current context variable $this

The special field variable $this (and not ?this, ?$this, $?this) is used to refer to the current context. In the top-level value filter and the top-level document filter, it refers to the document. In the per-field value filter, it refers to the currently filtered field value. In the nested document filter, it refers to the nested document.

ALL() quantifier

In the context of document-level filtering, a match is true if at least one of potentially many field values match, e.g., ?location = <urn:Europe> would return true if the document contains { "location": ["<urn:Asia>", "<urn:Europe>"] }.

In addition to this, you can also use the ALL() quantifier when you need all values to match, e.g., ALL(?location) = <urn:Europe> would not match with the above document because <urn:Asia> does not match.

Entity filters and default values

Entity filters can be combined with default values in order to get more flexible behavior.

If a field has no values in the RDF database, the defaultValue is used. But if a field has some values, defaultValue is NOT used, even if all values are filtered out. See an example in Basic entity filter.

A typical use-case for an entity filter is having soft deletes, i.e., instead of deleting an entity, it is marked as deleted by the presence of a specific value for a given property.

Two-variable filtering

Besides comparing a field value to one or more constants or running an existential check on the field value, some use cases also require comparing the field value to the value of another field in order to produce the desired result. GraphDB solves this by supporting two-variable filtering in the per-field value filter, the top-level document filter, and the nested document filter.

Note

This type of filtering is not possible in the top-level value filter because the only variable that is available there is $this.

In the top-level document filter and the nested document filter, there are no restrictions as all values are available at the time of evaluation.

In the per-field value filter, two-variable filtering will reorder the defined fields such that values for other fields are already available when the current field’s filter is evaluated. For example, let’s say we defined a filter $this > ?salary for the field price. This will force the connector to process the field salary first, apply its per-field value filter if any, and only then start collecting and filtering the values for the field price.

Cyclic dependencies will be detected and reported as an invalid filter. For example, if in addition to the above we define a per-field value filter ?price > "1000"^^xsd:int for the field salary, a cyclic dependency will be detected as both price and salary will require the other field being indexed first.

Basic entity filter example

Given the following RDF data:

@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix example: <http://www.ontotext.com/example#> .

# the entity below will be synchronised because it has a matching value for city: ?city in ("London")
example:alpha
    rdf:type example:gadget ;
    example:name "John Synced" ;
    example:city "London" .

# the entity below will not be synchronised because it lacks the property completely: bound(?city)
example:beta
    rdf:type example:gadget ;
    example:name "Peter Syncfree" .

# the entity below will not be synchronized because it has a different city value:
# ?city in ("London") will remove the value "Liverpool" so bound(?city) will be false
example:gamma
    rdf:type example:gadget ;
    example:name "Mary Syncless" ;
    example:city "Liverpool" .

If you create a connector instance such as:

PREFIX kafka: <http://www.ontotext.com/connectors/kafka#>
PREFIX kafka-inst: <http://www.ontotext.com/connectors/kafka/instance#>

INSERT DATA {
  kafka-inst:my_index kafka:createConnector '''
    {
      "kafkaNode": "localhost:9092",
      "kafkaTopic": "my_index",
      "types": ["http://www.ontotext.com/example#gadget"],
      "fields": [
         {
           "fieldName": "name",
           "propertyChain": ["http://www.ontotext.com/example#name"]
         },
         {
           "fieldName": "city",
           "propertyChain": ["http://www.ontotext.com/example#city"],
           "valueFilter": "$this = \\"London\\""
         }
      ],
      "documentFilter": "bound(?city)"
    }
  ''' .
}

The entity :beta is not synchronized as it has no value for city.

To handle such cases, you can modify the connector configuration to specify a default value for city:

...
         {
           "fieldName": "city",
           "propertyChain": ["http://www.ontotext.com/example#city"],
           "defaultValue": "London"
         }
...
}

The default value is used for the entity :beta as it has no value for city in the repository. As the value is “London”, the entity is synchronized.

Advanced entity filter example

Sometimes, data represented in RDF is not well suited to map directly to non-RDF. For example, if you have news articles and they can be tagged with different concepts (locations, persons, events, etc.), one possible way to model this is a single property :taggedWith. Consider the following RDF data:

@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix example2: <http://www.ontotext.com/example2#> .

example2:Berlin
    rdf:type example2:Location ;
    rdfs:label "Berlin" .

example2:Mozart
    rdf:type example2:Person ;
    rdfs:label "Wolfgang Amadeus Mozart" .

example2:Einstein
    rdf:type example2:Person ;
    rdfs:label "Albert Einstein" .

example2:Cannes-FF
    rdf:type example2:Event ;
    rdfs:label "Cannes Film Festival" .

example2:Article1
    rdf:type example2:Article ;
    rdfs:comment "An article about a film about Einstein's life while he was a professor in Berlin." ;
    example2:taggedWith example2:Berlin ;
    example2:taggedWith example2:Einstein ;
    example2:taggedWith example2:Cannes-FF .

example2:Article2
    rdf:type example2:Article ;
    rdfs:comment "An article about Berlin." ;
    example2:taggedWith example2:Berlin .

example2:Article3
    rdf:type example2:Article ;
    rdfs:comment "An article about Mozart's life." ;
    example2:taggedWith example2:Mozart .

example2:Article4
    rdf:type example2:Article ;
    rdfs:comment "An article about classical music in Berlin." ;
    example2:taggedWith example2:Berlin ;
    example2:taggedWith example2:Mozart .

example2:Article5
    rdf:type example2:Article ;
    rdfs:comment "A boring article that has no tags." .

example2:Article6
    rdf:type example2:Article ;
    rdfs:comment "An article about the Cannes Film Festival in 2013." ;
    example2:taggedWith example2:Cannes-FF .

Assume you want to map this data to Kafka, so that the property example2:taggedWith x is mapped to separate fields taggedWithPerson and taggedWithLocation, according to the type of x (whereas we are not interested in Events). You can map taggedWith twice to different fields and then use an entity filter to get the desired values:

PREFIX kafka: <http://www.ontotext.com/connectors/kafka#>
PREFIX kafka-inst: <http://www.ontotext.com/connectors/kafka/instance#>

INSERT DATA {
  kafka-inst:my_index kafka:createConnector '''
    {
      "kafkaNode": "localhost:9092",
      "kafkaTopic": "my_index",
      "types": ["http://www.ontotext.com/example2#Article"],
      "fields": [
         {
            "fieldName": "comment",
            "propertyChain": ["http://www.w3.org/2000/01/rdf-schema#comment"]
         },
         {
           "fieldName": "taggedWithPerson",
           "propertyChain": ["http://www.ontotext.com/example2#taggedWith"],
           "valueFilter": "$this -> type = <http://www.ontotext.com/example2#Person>"
         },
         {
           "fieldName": "taggedWithLocation",
           "propertyChain": ["http://www.ontotext.com/example2#taggedWith"],
           "valueFilter": "$this -> type = <http://www.ontotext.com/example2#Location>"
         }
      ]
    }
  ''' .
}

Note

type is the short way to write <http://www.w3.org/1999/02/22-rdf-syntax-ns#type>.

The six articles in the RDF data above will be mapped as such:

Article IRI

Value in taggedWithPerson

Value in taggedWithLocation

Explanation

:Article1

:Einstein

:Berlin

:taggedWith has the values :Einstein, :Berlin and :Cannes-FF. The filter leaves only the correct values in the respective fields. The value :Cannes-FF is ignored as it does not match the filter.

:Article2

:Berlin

:taggedWith has the value :Berlin. After the filter is applied, only taggedWithLocation is populated.

:Article3

:Mozart

:taggedWith has the value :Mozart. After the filter is applied, only taggedWithPerson is populated

:Article4

:Mozart

:Berlin

:taggedWith has the values :Berlin and :Mozart. The filter leaves only the correct values in the respective fields.

:Article5

:taggedWith has no values. The filter is not relevant.

:Article6

:taggedWith has the value :Cannes-FF. The filter removes it as it does not match.

Overview of connector predicates

The following diagram shows a summary of all predicates that can administrate (create, drop, check status) connector instances or issue queries and retrieve results. It can be used as a quick reference of what a particular predicate needs to be attached to. Variables that are bound as a result of a query are shown in green, blank helper nodes are shown in blue, literals in red, and IRIs in orange. The predicates are represented by labeled arrows.

scale 0.85
left to right direction

skinparam activity {
  BackgroundColor<<BNode>> #D1E0FF
  BackgroundColor<<Var>> #D1FFD1
  BackgroundColor<<IRI>> #FFCC80
  BackgroundColor #FFE3E3
}

partition "Instance level" {
  "instance IRI" <<IRI>> -->[:createConnector] "JSON params"
  "instance IRI" -->[:dropConnector] "blank node\n or dummy value"
  "instance IRI" -->[:repairConnector] "blank node\n or dummy value"
  "instance IRI" -->[:connectorStatus] "?status" <<Var>>
}

Caveats

Producer sharing

The Kafka connector aims to minimize resource usage and provide smooth transactional operation. This is achieved by using a single Kafka producer object for each connector instance that is connected to the same Kafka broker node. This has the following benefits:

  • Memory consumption is reduced as each Kafka producer requires a certain amount of buffer memory.

  • A failed transaction in one Kafka connector instance will be reverted in all other Kafka connector instances together with the GraphDB transaction.

Due to the nature of Kafka producers it imposes a restriction as well:

  • All connector instances must use the same Kafka options, e.g., they must have the same values for the bulkUpdateBatchSize and kafkaCompressionType options.

Once you have created at least one Kafka connector instance and attempt to create another instance, the following are possible scenarios:

Different Kafka broker
  • The new connector instance specifies a different Kafka broker.

  • The connector instance will be created and a new Kafka producer will be instantiated.

Same Kafka broker + same Kafka options
  • The new connector instance specifies the same Kafka broker as one of the existing connectors and the SAME options as the existing connector.

  • The connector instance will be created and the existing Kafka producer will be reused

Same Kafka broker + different Kafka options
  • The new connector instance specifies the same Kafka broker as one of the existing connectors and DIFFERENT options than the existing connector.

  • The connector instance will NOT be created and an error explaining the reason will be thrown.

  • See Conflict resolution for possible workarounds.

Note

The Kafka broker for two connector instances is considered to be the same if at least one of the host/port pairs supplied via the kafkaNode option is the same.

Conflict resolution

When the attempt to create a new Kafka connector instance was denied because another instance was already created with different options, there are several possible ways to resolve the conflict:

Manual resolution
  • Examine the options of the new connector instance you want to create.

  • Make the options the same as of the existing connector instance.

Propagate the new options to the existing instances
  • Set the option kafkaPropagateConfig of the new instance to true.

  • The new options will be propagated to all existing instances that share the same Kafka broker node.

Force the allocation of a new producer
  • Set the option kafkaProducerId of the new instance to some non-empty identifier.

  • This will override the producer sharing mechanism and allocate a new producer associating it with the supplied producer ID.

  • The new connector will use the new options.

  • All existing instances will continue using their previous options.

Upgrading from previous versions

Migrating from GraphDB 9.x

GraphDB 10.0 introduces major changes to the filtering mechanism of the connectors. Existing connector instances will not be usable and attempting to use them for queries or updates will throw an error.

If your GraphDB 9.x (or older) connector definitions do not include an entity filter, you can simply repair them.

If your GraphDB 9.x (or older) connector definitions do include an entity filter with the entityFilter option, you need to rewrite the filter with one of the current filter types:

  1. Save your existing connector definition.

  2. Drop the connector instance.

  3. In general, most older connector filters can be easily rewritten using the per-field value filter and top-level document filter. Rewrite the filters as follows:

    Rule of thumb:

    • If you want to remove individual values, i.e., if the operand is not BOUND() –-> rewrite with per-field value filter.

    • If you want to remove entire documents, i.e., if the operand is BOUND() –> rewrite with top-level document filter.

    So if we take the example:

    ?location = <urn:Europe> AND BOUND(?location) AND ?type IN (<urn:Foo>, <urn:Bar>)
    

    It needs to be rewritten like this:

    • Per-field rule on field location: $this = <urn:Europe>

    • Per-field rule on field type: $this IN (<urn:Foo>, <urn:Bar>)

    • Top-level document filter: BOUND(?location)

  4. Recreate the connector instance using the new definition.