Kafka Sink Connector¶
What’s in this document?
Note
Despite having a similar name, the Kafka Sink connector is not a GraphDB connector.
Overview¶
Modern business has an ever increasing need of integrating data coming from multiple and diverse systems. Automating the update and continuous build of the knowledge graphs with the incoming streams of data can be cumbersome due to a number of reasons such as verbose functional code writing, numerous transactions per update, suboptimal usability of GraphDB’s RDF mapping language and the lack of a direct way to stream updates to knowledge graphs.
GraphDB’s open-source Kafka Sink connector, which supports smart updates with SPARQL templates, solves this issue by reducing the amount of code needed for raw event data transformation and thus contributing to the automation of knowledge graph updates. It is a separately running process, which helps avoid database sizing. The connector allows for customization according to the user’s specific business logic, and requires no GraphDB downtime during configuration.
With it, users can push update messages to Kafka, after which a Kafka consumer processes them and applies the updates in GraphDB.
Setup¶
Important
Before setting up the connector, make sure to have JDK 11 installed.
Install Kafka 2.8.0 or newer.
The Kafka Sink connector can be deployed in a Docker container. To install it and verify that it is working correctly, follow the GitHub README instructions.
Update types¶
The Kafka Sink connector supports three types of updates: simple add, replace graph, and smart update with a DELETE/INSERT template. A given Kafka topic is configured to accept updates in a predefined mode and format. The format must be one of the RDF formats.
Simple add¶
This is a simple INSERT operation where no document identifiers are needed, and new data is always added as is. All you need to provide is the new RDF data that is to be added. The following is valid:
The Kafka topic is configured to only add data.
The Kafka key is irrelevant but it is recommended to use a unique ID, e.g. a random UUID.
The Kafka value is the new RDF data to add.
Let’s see how it works.
Start GraphDB on the same or a different machine.
In GraphDB, create a repository called “kafka-test”.
To deploy the connector, execute in the project’s
docker-compose
directory:sudo docker-compose up --scale graphdb=0
where
graphdb=0
denotes that GraphDB must be started outside of the Docker container.Next, we will configure the Kafka sink connector that will add data into the repository. In the directory of the Kafka sink connector, execute:
curl http://localhost:8083/connectors \ -H 'Content-Type: application/json' \ --data '{"name":"kafka-sink-graphdb-add", "config":{ "graphdb.server.url":"http://graphdb.example.com:7200", "connector.class":"com.ontotext.kafka.GraphDBSinkConnector", "key.converter":"com.ontotext.kafka.convert.DirectRDFConverter", "value.converter":"com.ontotext.kafka.convert.DirectRDFConverter", "value.converter.schemas.enable":"false", "topics":"gdb-add", "tasks.max":1, "offset.storage.file.filename":"/tmp/storage-add", "graphdb.server.repository":"kafka-test", "graphdb.batch.size":64, "graphdb.batch.commit.limit.ms":1000, "graphdb.auth.type":"NONE", "graphdb.update.type":"ADD", "graphdb.update.rdf.format":"nq"}}'
with the following important parameters:
topics
, which can be one of the following:the name of the topic from which the connector will be reading the documents, here
gdb-add
a comma-separated list of topics
graphdb.server.url
: the URL of the GraphDB server, replace the sample valuehttp://graphdb.example.com:7200
with the actual URLImportant
Since GraphDB is running outside the Kafka Sink Docker container, using localhost in
graphdb.server.url
will not work. Use a hostname or IP that is visible from within the container.graphdb.server.repository
: the GraphDB repository in which the connector will write the documents, herekafka-test
graphdb.update.type
: the type of the update, hereADD
This will create the add data connector, which will read documents from the
gdb-add
topic and send them to thetest
repository on the respective GraphDB server.Note
One connector can work with only one configuration. If multiple configurations are added, Kafka Sink will pick a single config and run it. If we need more than one connector, we have to create and configure them correspondingly.
For the purposes of the example, we will also create a test Kafka producer that will write in the respective Kafka topic. In the Kafka installation directory, execute:
bin/kafka-console-producer.sh --bootstrap-server localhost:19092 --topic gdb-add
To add some RDF data in the producer, paste this into the same window, and press Enter.
<http://example/subject> <http://example/predicate> "This is an example of adding data" <http://example/graph> .
In the Workbench SPARQL editor of the “kafka-test” repository, run the query:
SELECT * WHERE { GRAPH ?g { ?s ?p ?o } }
The RDF data that we just added via the producer should be returned as result.
Replace graph¶
In this update type, a document (the smallest update unit) is defined as the contents of a named graph. Thus, to perform an update, the following information must be provided:
The IRI of the named graph – the document ID
The new RDF contents of the named graph – the document contents
The update is performed as follows:
The Kafka topic is configured for replace graph.
The Kafka key defines the named graph to update.
The Kafka value defines the contents of the named graph.
Let’s try it out.
We already have the Docker container with the Kafka sink connector running, and have created the “kafka-test” repository.
Now, let’s configure the Kafka sink connector that will replace data in a named graph. In the directory of the Kafka sink connector, execute:
curl http://localhost:8084/connectors \ -H 'Content-Type: application/json'\ --data '{"name":"kafka-sink-graphdb-replace", "config":{ "graphdb.server.url":"http://graphdb.example.com:7200", "connector.class":"com.ontotext.kafka.GraphDBSinkConnector", "key.converter":"com.ontotext.kafka.convert.DirectRDFConverter", "value.converter":"com.ontotext.kafka.convert.DirectRDFConverter", "value.converter.schemas.enable":"false", "topics":"gdb-replace", "tasks.max":1, "offset.storage.file.filename":"/tmp/storage-replace", "graphdb.server.repository":"kafka-test", "graphdb.batch.size":64, "graphdb.batch.commit.limit.ms":1000, "graphdb.auth.type":"NONE", "graphdb.update.type":"REPLACE_GRAPH", "graphdb.update.rdf.format":"nq"}}'
with the same important parameters as in the add data example above.
This will configure the replace graph connector, which will read data from the
gdb-replace
topic and send them to thekafka-test
repository on the respective GraphDB server.Note
Here, we have created the connector on a different URL from the previous one -
http://localhost:8084/connectors
. If you want to create it on the same URL (http://localhost:8083/connectors
), you need to first delete the existing connector:curl -X DELETE http://localhost:8083/connectors/kafka-sink-graphdb-add
To replace data in a specific named graph, we need to provide:
the name of the graph as key of the Kafka message in the Kafka topic
the new data for the replace as value of the Kafka message
Thus, we need to modify the producer to create a key-value message. In the Kafka installation directory, execute:
bin/kafka-console-producer.sh --bootstrap-server localhost:19092 --topic gdb-replace \ --property parse.key=true --property key.separator="-"
To replace the data in the graph, paste this into the same window, and press Enter.
http://example/graph-<http://example/subject> <http://example/predicate> "Successfully replaced graph" <http://example/graph> .
The key value must be the
?id
value from the template.To see the replaced data, run the query from above in the Workbench SPARQL editor of the “kafka-test” repository:
SELECT * WHERE { GRAPH ?g { ?s ?p ?o } }
The replaced data in the graph should be returned as result.
DELETE/INSERT template¶
In this update type, a document is defined as all triples for a given document identifier according to a predefined schema. The schema is described as a SPARQL DELETE/INSERT template that can be filled from the provided data at update time. The following must be present at update time:
The SPARQL template update - must be predefined, not provided at update time
Can be a DELETE WHERE update that only deletes the previous version of the document and the new data is inserted as is.
Can be a DELETE INSERT WHERE update that deletes the previous version of the document and adds additional triples, e.g. timestamp information.
The IRI of the updated document
The new RDF contents of the updated document
The update is performed as follows:
The Kafka topic is configured for a specific template.
The Kafka key of the message holds the value to be used for the
?id
parameter in the template’s body - the template binding.The Kafka value defines the new data to be added with the update.
Important
One SPARQL template typically corresponds to a single document type and is used by a single Kafka sink.
Let’s see how it works.
As with the previous update, we already have the Docker container with the Kafka Sink connector running, and will again be using the “kafka-test” repository.
With this type of update, we need to first create a SPARQL template for the data update.
In the Workbench, go to
.Enter a template IRI (required), e.g.,
http://example.com/my-template
.As template body, insert:
DELETE { graph ?g { ?id ?p ?oldValue . } } INSERT { graph ?g { ?id ?p "Successfully updated example" . } } WHERE { graph ?g { ?id ?p ?oldValue . } }
This simple template will look for a given subject in all graphs -
?id
, which we will need to supply later when executing the update. The template will then update the object in all triples containing this subject to a new value -"Successfully updated example"
.Save it, after which it will appear in the templates list.
Now we need to configure the Kafka sink connector that will update some data in a named graph. In the directory of the Kafka sink connector, execute:
curl http://localhost:8085/connectors \ -H 'Content-Type: application/json' \ --data '{"name": "kafka-sink-graphdb-update", "config": { "graphdb.server.url":"http://graphdb.example.com:7200", "connector.class": "com.ontotext.kafka.GraphDBSinkConnector", "key.converter": "com.ontotext.kafka.convert.DirectRDFConverter", "value.converter": "com.ontotext.kafka.convert.DirectRDFConverter", "value.converter.schemas.enable": "false", "topics": "gdb-update", "tasks.max": 1, "offset.storage.file.filename": "/tmp/storage-update", "graphdb.server.repository": "kafka-test", "graphdb.batch.size": 64, "graphdb.batch.commit.limit.ms": 1000, "graphdb.auth.type": "NONE", "graphdb.update.type": "SMART_UPDATE", "graphdb.update.rdf.format": "nq", "graphdb.template.id": "http://example.com/my-template"}}'Here, we specify the template IRI
"graphdb.template.id":"http://example.com/my-template"
that we created in the Workbench earlier, which the sink connector will execute.Note
As in the previous example, we have created the connector on a different URL -
http://localhost:8085/connectors
. If you want to create it on a URL that is already used, you first need to clean the connector that is on it as shown above.
To execute a SPARQL update, we need to provide the binding of the template,
?id
. It is passed it as the key of the Kafka message, and the data to be add is passed as value.In the Kafka installation directory, execute:
bin/kafka-console-producer.sh --bootstrap-server localhost:19092 --topic gdb-update \ --property parse.key=true --property key.separator="-"
To execute an update, paste this into the same window, and press Enter.
http://example/graph-<http://example/subject> <http://example/predicate> "Now we can make SPARQL updates" <http://example/graph> .
To see the updated data, run the query from above in the Workbench SPARQL editor of the “kafka-test” repository:
SELECT * WHERE { GRAPH ?g { ?s ?p ?o } }
The updated data in the graph should be returned as result.
Configuration properties¶
The following properties are used to configure the Kafka Sink connector:
Property |
Description |
Valid values |
---|---|---|
|
Globally unique name to use for this connector. |
String |
|
Name or alias of the class for this connector. Must be a subclass of |
String |
|
Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the keys in messages written to or read from Kafka, and since this is independent of connectors, it allows any connector to work with any serialization format. Default is |
Class |
|
Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the values in messages written to or read from Kafka, and since this is independent of connectors, it allows any connector to work with any serialization format. Default is |
Class |
|
Comma-separated list of topics to consume. |
List |
|
Maximum number of tasks to use for this connector. Default is |
Integer |
|
The URL of the GraphDB server. |
String |
|
The GraphDB repository where the connector will write the documents. |
String |
|
The maximum number of documents to be sent from Kafka to GraphDB in one transaction. |
Integer |
|
The authentication type. |
|
|
The username for basic authentication. |
String |
|
The password for basic authentication. |
String |
|
The GraphDB authentication token. |
String |
|
The type of the transaction. |
|
|
The format of the documents sent from Kafka to GraphDB. Default is |
|
|
The timeout applied per batch that is not full before it is committed. Default is |
Long |
|
Behavior for tolerating errors during connector operation. |
|
|
The topic name in Kafka brokers to store failed records. Default is blank. |
String |
|
Replication factor used to create the dead letter queue topic when it does not already exist. Default is |
Short |
|
The maximum duration in milliseconds that a failed operation will be reattempted. Default is |
Long |
|
The maximum duration in milliseconds between consecutive retry attempts. |
Long |
|
The IRIs of Kafka brokers. Defaults are |
List |