Kafka Sink Connector

Note

Despite having a similar name, the Kafka Sink connector is not a GraphDB connector.

Overview

Modern business has an ever increasing need of integrating data coming from multiple and diverse systems. Automating the update and continuous build of the knowledge graphs with the incoming streams of data can be cumbersome due to a number of reasons such as verbose functional code writing, numerous transactions per update, suboptimal usability of GraphDB’s RDF mapping language and the lack of a direct way to stream updates to knowledge graphs.

GraphDB’s open-source Kafka Sink connector, which supports smart updates with SPARQL templates, solves this issue by reducing the amount of code needed for raw event data transformation and thus contributing to the automation of knowledge graph updates. It is a separately running process, which helps avoid database sizing. The connector allows for customization according to the user’s specific business logic, and requires no GraphDB downtime during configuration.

With it, users can push update messages to Kafka, after which a Kafka consumer processes them and applies the updates in GraphDB.

Setup

Important

Before setting up the connector, make sure to have JDK 11 installed.

  1. Install Kafka 2.8.0 or newer.

  2. The Kafka Sink connector can be deployed in a Docker container. To install it and verify that it is working correctly, follow the GitHub README instructions.

Update types

The Kafka Sink connector supports three types of updates: simple add, replace graph, and smart update with a DELETE/INSERT template. A given Kafka topic is configured to accept updates in a predefined mode and format. The format must be one of the RDF formats.

Simple add

This is a simple INSERT operation where no document identifiers are needed, and new data is always added as is. All you need to provide is the new RDF data that is to be added. The following is valid:

  • The Kafka topic is configured to only add data.

  • The Kafka key is irrelevant but it is recommended to use a unique ID, e.g. a random UUID.

  • The Kafka value is the new RDF data to add.

Let’s see how it works.

  1. Start GraphDB on the same or a different machine.

  2. In GraphDB, create a repository called “kafka-test”.

  3. To deploy the connector, execute in the project’s docker-compose directory:

    sudo docker-compose up --scale graphdb=0
    

    where graphdb=0 denotes that GraphDB must be started outside of the Docker container.

  4. Next, we will configure the Kafka sink connector that will add data into the repository. In the directory of the Kafka sink connector, execute:

    curl http://localhost:8083/connectors \
        -H 'Content-Type: application/json' \
        --data '{"name":"kafka-sink-graphdb-add",
            "config":{
             "graphdb.server.url":"http://graphdb.example.com:7200",
             "connector.class":"com.ontotext.kafka.GraphDBSinkConnector",
             "key.converter":"com.ontotext.kafka.convert.DirectRDFConverter",
             "value.converter":"com.ontotext.kafka.convert.DirectRDFConverter",
             "value.converter.schemas.enable":"false",
             "topics":"gdb-add",
             "tasks.max":1,
             "offset.storage.file.filename":"/tmp/storage-add",
             "graphdb.server.repository":"kafka-test",
             "graphdb.batch.size":64,
             "graphdb.batch.commit.limit.ms":1000,
             "graphdb.auth.type":"NONE",
             "graphdb.update.type":"ADD",
             "graphdb.update.rdf.format":"nq"}}'
    

    with the following important parameters:

    • topics, which can be one of the following:

      • the name of the topic from which the connector will be reading the documents, here gdb-add

      • a comma-separated list of topics

    • graphdb.server.url: the URL of the GraphDB server, replace the sample value http://graphdb.example.com:7200 with the actual URL

      Important

      Since GraphDB is running outside the Kafka Sink Docker container, using localhost in graphdb.server.url will not work. Use a hostname or IP that is visible from within the container.

    • graphdb.server.repository: the GraphDB repository in which the connector will write the documents, here kafka-test

    • graphdb.update.type: the type of the update, here ADD

    This will create the add data connector, which will read documents from the gdb-add topic and send them to the test repository on the respective GraphDB server.

    Note

    One connector can work with only one configuration. If multiple configurations are added, Kafka Sink will pick a single config and run it. If we need more than one connector, we have to create and configure them correspondingly.

  5. For the purposes of the example, we will also create a test Kafka producer that will write in the respective Kafka topic. In the Kafka installation directory, execute:

    bin/kafka-console-producer.sh --bootstrap-server localhost:19092 --topic gdb-add
    
  6. To add some RDF data in the producer, paste this into the same window, and press Enter.

    <http://example/subject> <http://example/predicate> "This is an example of adding data" <http://example/graph> .
    
  7. In the Workbench SPARQL editor of the “kafka-test” repository, run the query:

    SELECT * WHERE {
        GRAPH ?g {
            ?s ?p ?o
        }
    }
    
  8. The RDF data that we just added via the producer should be returned as result.

Replace graph

In this update type, a document (the smallest update unit) is defined as the contents of a named graph. Thus, to perform an update, the following information must be provided:

  • The IRI of the named graph – the document ID

  • The new RDF contents of the named graph – the document contents

The update is performed as follows:

  • The Kafka topic is configured for replace graph.

  • The Kafka key defines the named graph to update.

  • The Kafka value defines the contents of the named graph.

Let’s try it out.

  1. We already have the Docker container with the Kafka sink connector running, and have created the “kafka-test” repository.

  2. Now, let’s configure the Kafka sink connector that will replace data in a named graph. In the directory of the Kafka sink connector, execute:

    curl http://localhost:8084/connectors \
          -H 'Content-Type: application/json'\
         --data '{"name":"kafka-sink-graphdb-replace",
               "config":{
                  "graphdb.server.url":"http://graphdb.example.com:7200",
                  "connector.class":"com.ontotext.kafka.GraphDBSinkConnector",
                  "key.converter":"com.ontotext.kafka.convert.DirectRDFConverter",
                  "value.converter":"com.ontotext.kafka.convert.DirectRDFConverter",
                  "value.converter.schemas.enable":"false",
                  "topics":"gdb-replace",
                  "tasks.max":1,
                  "offset.storage.file.filename":"/tmp/storage-replace",
                  "graphdb.server.repository":"kafka-test",
                  "graphdb.batch.size":64,
                  "graphdb.batch.commit.limit.ms":1000,
                  "graphdb.auth.type":"NONE",
                  "graphdb.update.type":"REPLACE_GRAPH",
                  "graphdb.update.rdf.format":"nq"}}'
    

    with the same important parameters as in the add data example above.

    This will configure the replace graph connector, which will read data from the gdb-replace topic and send them to the kafka-test repository on the respective GraphDB server.

    Note

    Here, we have created the connector on a different URL from the previous one - http://localhost:8084/connectors. If you want to create it on the same URL (http://localhost:8083/connectors), you need to first delete the existing connector:

    curl -X DELETE http://localhost:8083/connectors/kafka-sink-graphdb-add
    
  3. To replace data in a specific named graph, we need to provide:

    • the name of the graph as key of the Kafka message in the Kafka topic

    • the new data for the replace as value of the Kafka message

    Thus, we need to modify the producer to create a key-value message. In the Kafka installation directory, execute:

    bin/kafka-console-producer.sh --bootstrap-server localhost:19092 --topic gdb-replace \
        --property parse.key=true --property key.separator="-"
    
  4. To replace the data in the graph, paste this into the same window, and press Enter.

    http://example/graph-<http://example/subject> <http://example/predicate> "Successfully replaced graph" <http://example/graph> .
    

    The key value must be the ?id value from the template.

  5. To see the replaced data, run the query from above in the Workbench SPARQL editor of the “kafka-test” repository:

    SELECT * WHERE {
        GRAPH ?g {
            ?s ?p ?o
        }
    }
    
  6. The replaced data in the graph should be returned as result.

DELETE/INSERT template

In this update type, a document is defined as all triples for a given document identifier according to a predefined schema. The schema is described as a SPARQL DELETE/INSERT template that can be filled from the provided data at update time. The following must be present at update time:

  • The SPARQL template update - must be predefined, not provided at update time

    • Can be a DELETE WHERE update that only deletes the previous version of the document and the new data is inserted as is.

    • Can be a DELETE INSERT WHERE update that deletes the previous version of the document and adds additional triples, e.g. timestamp information.

  • The IRI of the updated document

  • The new RDF contents of the updated document

The update is performed as follows:

  • The Kafka topic is configured for a specific template.

  • The Kafka key of the message holds the value to be used for the ?id parameter in the template’s body - the template binding.

  • The Kafka value defines the new data to be added with the update.

Important

One SPARQL template typically corresponds to a single document type and is used by a single Kafka sink.

Let’s see how it works.

  1. As with the previous update, we already have the Docker container with the Kafka Sink connector running, and will again be using the “kafka-test” repository.

  2. With this type of update, we need to first create a SPARQL template for the data update.

    1. In the Workbench, go to Setup ‣ SPARQL Templates ‣ Create new SPARQL template.

    2. Enter a template IRI (required), e.g., http://example.com/my-template.

    3. As template body, insert:

      DELETE {
          graph ?g { ?id ?p ?oldValue . }
      } INSERT {
          graph ?g { ?id ?p "Successfully updated example" . }
      } WHERE {
          graph ?g { ?id ?p ?oldValue . }
      }
      

      This simple template will look for a given subject in all graphs - ?id, which we will need to supply later when executing the update. The template will then update the object in all triples containing this subject to a new value - "Successfully updated example".

    4. Save it, after which it will appear in the templates list.

      _images/kafka-sink-update-template.png

Now we need to configure the Kafka sink connector that will update some data in a named graph. In the directory of the Kafka sink connector, execute:

curl http://localhost:8085/connectors \
    -H 'Content-Type: application/json' \
    --data '{"name": "kafka-sink-graphdb-update",
        "config": {
        "graphdb.server.url":"http://graphdb.example.com:7200",
        "connector.class": "com.ontotext.kafka.GraphDBSinkConnector",
        "key.converter": "com.ontotext.kafka.convert.DirectRDFConverter",
        "value.converter": "com.ontotext.kafka.convert.DirectRDFConverter",
        "value.converter.schemas.enable": "false",
        "topics": "gdb-update",
        "tasks.max": 1,
        "offset.storage.file.filename": "/tmp/storage-update",
        "graphdb.server.repository": "kafka-test",
        "graphdb.batch.size": 64,
        "graphdb.batch.commit.limit.ms": 1000,
        "graphdb.auth.type": "NONE",
        "graphdb.update.type": "SMART_UPDATE",
        "graphdb.update.rdf.format": "nq",
        "graphdb.template.id":
        "http://example.com/my-template"}}'

Here, we specify the template IRI "graphdb.template.id":"http://example.com/my-template" that we created in the Workbench earlier, which the sink connector will execute.

Note

As in the previous example, we have created the connector on a different URL - http://localhost:8085/connectors. If you want to create it on a URL that is already used, you first need to clean the connector that is on it as shown above.

  1. To execute a SPARQL update, we need to provide the binding of the template, ?id. It is passed it as the key of the Kafka message, and the data to be add is passed as value.

    In the Kafka installation directory, execute:

    bin/kafka-console-producer.sh --bootstrap-server localhost:19092 --topic gdb-update \
        --property parse.key=true --property key.separator="-"
    
  2. To execute an update, paste this into the same window, and press Enter.

    http://example/graph-<http://example/subject> <http://example/predicate> "Now we can make SPARQL updates" <http://example/graph> .
    
  3. To see the updated data, run the query from above in the Workbench SPARQL editor of the “kafka-test” repository:

    SELECT * WHERE {
        GRAPH ?g {
            ?s ?p ?o
        }
    }
    
  4. The updated data in the graph should be returned as result.

Configuration properties

The following properties are used to configure the Kafka Sink connector:

Property

Description

Valid values

name

Globally unique name to use for this connector.

String

connector.class

Name or alias of the class for this connector. Must be a subclass of org.apache.kafka.connect.connector.Connector. If the connector is org.apache.kafka.connect.file.FileStreamSinkConnector, you can either specify this full name, or use FileStreamSink or FileStreamSinkConnector to make the configuration a bit shorter.

String

key.converter

Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the keys in messages written to or read from Kafka, and since this is independent of connectors, it allows any connector to work with any serialization format. Default is NULL.

Class

value.converter

Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the values in messages written to or read from Kafka, and since this is independent of connectors, it allows any connector to work with any serialization format. Default is NULL.

Class

topics

Comma-separated list of topics to consume.

List

tasks.max

Maximum number of tasks to use for this connector. Default is 1.

Integer

graphdb.server.url

The URL of the GraphDB server.

String

graphdb.server.repository

The GraphDB repository where the connector will write the documents.

String

graphdb.batch.size

The maximum number of documents to be sent from Kafka to GraphDB in one transaction.

Integer

graphdb.auth.type

The authentication type.

NONE, BASIC, CUSTOM

graphdb.auth.basic.username

The username for basic authentication.

String

graphdb.auth.basic.password

The password for basic authentication.

String

graphdb.auth.header.token

The GraphDB authentication token.

String

graphdb.update.type

The type of the transaction.

ADD, REPLACE_GRAPH, SMART_UPDATE

graphdb.update.rdf.format

The format of the documents sent from Kafka to GraphDB. Default is NONE.

RDF, RDFS, XML, OWL, NT, TTL, TTLS, N3, TRIX, TRIG, TRIGS, BRF, NQ, JSONLD, NDJSON, NDJSONLD, JSONL, RJ, XHTML, HMTL, HDT

graphdb.batch.commit.limit.ms

The timeout applied per batch that is not full before it is committed. Default is 3000.

Long

errors.tolerance

Behavior for tolerating errors during connector operation. NONE is the default value and signals that any error will result in an immediate connector task failure; ALL changes the behavior to skip over problematic records.

NONE, ALL

errors.deadletterqueue.topic.name

The topic name in Kafka brokers to store failed records. Default is blank.

String

errors.deadletterqueue.topic.replication.factor

Replication factor used to create the dead letter queue topic when it does not already exist. Default is 3.

Short

errors.retry.timeout

The maximum duration in milliseconds that a failed operation will be reattempted. Default is 0, which means no retries will be attempted. Use -1 for infinite retries.

Long

errors.retry.delay.max.ms

The maximum duration in milliseconds between consecutive retry attempts.

Long

bootstrap.servers

The IRIs of Kafka brokers. Defaults are localhost:9092, localhost:9093, localhost:9093.

List