Kafka GraphDB Connector¶
What’s in this document?
Note
This feature requires a GraphDB Enterprise license.
Overview and features¶
The Kafka connector provides a means to synchronize changes to the RDF model to any Kafka consumer, staying automatically up-to-date with the GraphDB repository data.
Note
GraphDB supports full-text search options as well.
The Connectors provide synchronization at the entity level, where an entity is defined as having a unique identifier (an IRI) and a set of properties and property values. In terms of RDF, this corresponds to a set of triples that have the same subject. In addition to simple properties (defined by a single triple), the Connectors support property chains. A property chain is defined as a sequence of triples where each triple’s object is the subject of the following triple.
On the Kafka side, the RDF entities are translated to JSON documents.
The main features of the Kafka Connector are:
maintaining a Kafka topic that is always in sync with the data stored in GraphDB;
multiple independent instances per repository;
the entities for synchronization are defined by:
a list of fields (on the Kafka side) and property chains (on the GraphDB side) whose values will be synchronized;
a list of
rdf:type
’s of the entities for synchronization;a list of languages for synchronization (the default is all languages);
additional filtering by property and value.
Unlike the Elasticsearch, Solr, and Lucene connectors, the Kafka connector does not have a query interface since Kafka is a simple message queue and does not provide search functionality.
Each feature is described in detail below.
In terms of Kafka terminology and behavior:
Each connector instance must be assigned to a fixed Kafka topic.
The connector is a Kafka producer, and does not have any information about the Kafka consumers.
The partitions are assigned by the Kafka framework and not the connector.
Usage¶
All interactions with the Kafka GraphDB Connector are done through SPARQL queries.
There are three types of SPARQL queries:
INSERT
for creating, updating, and deleting connector instances;SELECT
for listing connector instances and querying their configuration parameters;INSERT
/SELECT
for storing and querying data as part of the normal GraphDB data workflow.
In general, this corresponds to INSERT
that adds or modifies data, and
to SELECT
that queries existing data.
Each connector implementation defines its own IRI prefix to distinguish
it from other connectors. For the Kafka GraphDB Connector, this
is http://www.ontotext.com/connectors/kafka#
. Each command or
predicate executed by the connector uses this prefix, e.g.,
http://www.ontotext.com/connectors/kafka#createConnector
to
create a connector instance for Kafka.
Individual instances of a connector are distinguished by unique names
that are also IRIs. They have their own prefix to avoid clashing with
any of the command predicates. For Kafka, the instance prefix is
http://www.ontotext.com/connectors/kafka/instance#
.
- Sample data
All examples use the following
sample data
that describes five fictitious wines: Yoyowine, Franvino, Noirette, Blanquito, and Rozova, as well as the grape varieties required to make these wines. The minimum required ruleset level in GraphDB is RDFS.@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> . @prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> . @prefix xsd: <http://www.w3.org/2001/XMLSchema#> . @prefix wine: <http://www.ontotext.com/example/wine#> . wine:RedWine rdfs:subClassOf wine:Wine . wine:WhiteWine rdfs:subClassOf wine:Wine . wine:RoseWine rdfs:subClassOf wine:Wine . wine:Merlo rdf:type wine:Grape ; rdfs:label "Merlo" . wine:CabernetSauvignon rdf:type wine:Grape ; rdfs:label "Cabernet Sauvignon" . wine:CabernetFranc rdf:type wine:Grape ; rdfs:label "Cabernet Franc" . wine:PinotNoir rdf:type wine:Grape ; rdfs:label "Pinot Noir" . wine:Chardonnay rdf:type wine:Grape ; rdfs:label "Chardonnay" . wine:Yoyowine rdf:type wine:RedWine ; wine:madeFromGrape wine:CabernetSauvignon ; wine:hasSugar "dry" ; wine:hasYear "2013"^^xsd:integer . wine:Franvino rdf:type wine:RedWine ; wine:madeFromGrape wine:Merlo ; wine:madeFromGrape wine:CabernetFranc ; wine:hasSugar "dry" ; wine:hasYear "2012"^^xsd:integer . wine:Noirette rdf:type wine:RedWine ; wine:madeFromGrape wine:PinotNoir ; wine:hasSugar "medium" ; wine:hasYear "2012"^^xsd:integer . wine:Blanquito rdf:type wine:WhiteWine ; wine:madeFromGrape wine:Chardonnay ; wine:hasSugar "dry" ; wine:hasYear "2012"^^xsd:integer . wine:Rozova rdf:type wine:RoseWine ; wine:madeFromGrape wine:PinotNoir ; wine:hasSugar "medium" ; wine:hasYear "2013"^^xsd:integer .
Setup and maintenance¶
Prerequisites¶
- Third-party component versions
This version of the Kafka GraphDB Connector uses Kafka version 3.3.1.
Creating a connector instance¶
Creating a connector instance is done by sending a SPARQL query with the following configuration data:
the name of the connector instance (e.g.,
my_index
);a Kafka node and topic to synchronize to;
classes to synchronize;
properties to synchronize.
The configuration data has to be provided as a JSON string representation and passed together with the create command.
You can create connectors via a Workbench dialog or by using a SPARQL update query (create command).
If you create the connector via the Workbench, no matter which way you use, you will be presented with a pop-up screen showing you the connector creation progress.
Using the Workbench¶
Go to
.Click New Connector in the tab of the respective Connector type you want to create.
Fill out the configuration form.
Execute the
CREATE
statement from the form by clicking OK. Alternatively, you can view its SPARQL query by clicking View SPARQL Query, and then copy it to execute it manually or integrate it in automation scripts.
Using the create command¶
The create command is triggered by a SPARQL INSERT
with the
kafka:createConnector
predicate, e.g., it creates a connector instance
called my_index
, which synchronizes the wines from the sample data
above.
To be able to use newlines and quotes without the need for escaping,
here we use SPARQL’s multi-line string delimiter consisting of 3 apostrophes: '''...'''
.
You can also use 3 quotes instead: """..."""
.
PREFIX kafka: <http://www.ontotext.com/connectors/kafka#>
PREFIX kafka-inst: <http://www.ontotext.com/connectors/kafka/instance#>
INSERT DATA {
kafka-inst:my_index kafka:createConnector '''
{
"kafkaNode": "localhost:9092",
"kafkaTopic": "my_index",
"types": [
"http://www.ontotext.com/example/wine#Wine"
],
"fields": [
{
"fieldName": "grape",
"propertyChain": [
"http://www.ontotext.com/example/wine#madeFromGrape"
]
},
{
"fieldName": "sugar",
"propertyChain": [
"http://www.ontotext.com/example/wine#hasSugar"
]
},
{
"fieldName": "year",
"propertyChain": [
"http://www.ontotext.com/example/wine#hasYear"
]
}
]
}
''' .
}
The above command creates a new Kafka connector instance that
connects to the Kafka instance accessible at port 9200
on the
localhost as specified by the kafkaNode
key.
The "types"
key defines the RDF type of the entities to synchronize and,
in the example, it is only entities of the type http://www.ontotext.com/example/wine#Wine
(and its subtypes if RDFS or higher-level reasoning is enabled). The "fields"
key defines the mapping from RDF to
Kafka. The basic building block is the property chain, i.e., a
sequence of RDF properties where the object of each property is the
subject of the following property. In the example, three bits of
information are mapped - the grape the wines are made of, sugar content,
and year. Each chain is assigned a short and convenient field name:
“grape”, “sugar”, and “year”. The field names are later used in the
queries.
The field grape
is an example of a property chain composed of more than one
property. First, we take the wine’s madeFromGrape
property, the object
of which is an instance of the type Grape, and then we take the
rdfs:label
of this instance. The fields sugar
and year
are both composed of a
single property that links the value directly to the wine.
Working with a secured Kafka broker¶
GraphDB can connect to a secured Kafka broker using the SASL/PLAIN authentication mechanism. To configure it, set the kafkaPlainAuthUsername and kafkaPlainAuthPassword parameters. Since the password will be transmitted in clear text, it is recommended to enable SSL on the Kafka broker, and accordingly set the kafkaSSL parameter to true
.
Instead of supplying the username and password as part of the connector instance configuration, you can also implement a custom authenticator class and set it via the authenticationConfiguratorClass
option. See these connector authenticator examples for more information and example projects that implement such a custom class.
There is no explicitly configurable support for other authentication mechanism supported Kafka. It should be possible to configure most of them by supplying the relevant Kafka producer properties via the kafkaProducerConfig parameter.
Dropping a connector instance¶
Dropping a connector instance removes all references to its external store from GraphDB as well as the Kafka index associated with it.
The drop command is triggered by a SPARQL INSERT
with the
dropConnector
predicate where the name of the connector instance has
to be in the subject position, e.g., this removes the connector
my_index
:
PREFIX kafka: <http://www.ontotext.com/connectors/kafka#>
PREFIX kafka-inst: <http://www.ontotext.com/connectors/kafka/instance#>
INSERT DATA {
kafka-inst:my_index kafka:dropConnector [] .
}
You can also force drop a connector in case a normal delete does not work. The force delete will remove the connector even if part of the operation fails. Go to Force delete in the dialog box.
where you will see the already existing connectors that you have created. Click the delete icon, and check
Retrieving the create options for a connector instance¶
You can view the options string that was used to create a particular connector instance with the following query:
PREFIX kafka: <http://www.ontotext.com/connectors/kafka#>
PREFIX kafka-inst: <http://www.ontotext.com/connectors/kafka/instance#>
SELECT ?createString {
kafka-inst:my_index kafka:listOptionValues ?createString .
}
Listing available connector instances¶
In the Connectors management view¶
Existing Connector instances shown below the New Connector button. Click the name of an instance to view its configuration and SPARQL query, or click the repair / delete icons to perform these operations. Click the copy icon to copy the connector definition query to your clipboard.
With a SPARQL query¶
Listing connector instances returns all previously created instances. It
is a SELECT
query with the listConnectors
predicate:
PREFIX kafka: <http://www.ontotext.com/connectors/kafka#>
SELECT ?cntUri ?cntStr {
?cntUri kafka:listConnectors ?cntStr .
}
?cntUri
is bound to the prefixed IRI of the connector instance that
was used during creation, e.g., http://www.ontotext.com/connectors/kafka/instance#my_index
,
while ?cntStr
is bound to a string, representing the part after the
prefix, e.g., "my_index"
.
Instance status check¶
The internal state of each connector instance can be queried using a
SELECT
query and the connectorStatus
predicate:
PREFIX kafka: <http://www.ontotext.com/connectors/kafka#>
SELECT ?cntUri ?cntStatus {
?cntUri kafka:connectorStatus ?cntStatus .
}
?cntUri
is bound to the prefixed IRI of the connector instance,
while ?cntStatus
is bound to a string representation of the status
of the connector represented by this IRI. The status is key-value based.
Working with data¶
Adding, updating, and deleting data¶
From the user point of view, all synchronization happens transparently
without using any additional predicates or naming a specific store
explicitly, i.e., you must simply execute standard SPARQL
INSERT
/DELETE
queries. This is achieved by intercepting all changes in
the plugin and determining which Kafka documents need to be updated.
List of creation parameters¶
The creation parameters define how a connector instance is created by
the kafka:createConnector
predicate. Some are required and some are optional.
All parameters are provided together in a JSON object, where the
parameter names are the object keys. Parameter values may be simple JSON
values such as a string or a boolean, or they can be lists or objects.
All of the creation parameters can also be set conveniently from the Create Connector user interface without any knowledge of JSON.
readonly
(boolean), optional, read-only modeA read-only connector will index all existing data in the repository at creation time, but, unlike non-read-only connectors, it will:
Not react to updates. Changes will not be synced to the connector.
Not keep any extra structures (such as the internal Lucene index for tracking updates to chains)
The only way to index changes in data after the connector has been created is to repair (or drop/recreate) the connector.
importGraph
(boolean), optional, specifies that the RDF data from which to create the connector is in a special virtual graphUsed to make a Kafka index from temporary RDF data inserted in the same transaction. It requires read-only mode and creates a connector whose data will come from statements inserted into a special virtual graph instead of data contained in the repository. The virtual graph is
kafka:graph
, where the prefixkafka:
is as defined before. The data have to be inserted into this graph before the connector create statement is executed.Both the insertion into the special graph and the create statement must be in the same transaction. In GDB Workbench, this can be done by pasting them one after another in the SPARQL editor and putting a semicolon at the end of the first INSERT. This functionality requires read-only mode.
PREFIX kafka: <http://www.ontotext.com/connectors/kafka#> INSERT { GRAPH kafka:graph { ... } } WHERE { ... }; PREFIX kafka: <http://www.ontotext.com/connectors/kafka#> PREFIX kafka-inst: <http://www.ontotext.com/connectors/kafka/instance#> INSERT DATA { kafka-inst:my_index kafka:createConnector ''' { "readonly": true, "importGraph": true, "fields": [], "languages": [], "types": [], } ''' . }
importFile
(string), optional, an RDF file with data from which to create the connectorCreates a connector whose data will come from an RDF file on the file system instead of data contained in the repository. The value must be the full path to the RDF file. This functionality requires readonly mode.
detectFields
(boolean), optional, detects fieldsThis mode introduces automatic field detection when creating a connector. You can omit specifying
fields
in JSON. Instead, you will get automaticfields
: each corresponds to a single predicate, and its field name is the same as the predicate (so you need to use escaping when issuing Kafka queries).In this mode, specifying types is optional too. If types are not provided, then all types will be indexed. This mode requires importGraph or importFile.
Once the connector is created, you can inspect the detected fields in the Connector management section of the Workbench.
kafkaNode
(string), required, the Kafka instance to sync toAs Kafka is a third-party service, you have to specify the node where it is running. The format of the node value is of the form
http://hostname.domain:port
,https://
is allowed too. No default value. Can be updated at runtime without having to rebuild the index.
kafkaTopic
(string), required, the Kafka topic to send documents to.No default value.
kafkaSSL
(boolean), optional, controls whether to use an SSL connection to the Kafka broker.False
by default. Can be updated at runtime without having to rebuild the index.
kafkaPlainAuthUsername
(string), optional, supplies the username for Kafka SASL PLAIN authentication.No default value. Can be updated at runtime without having to rebuild the index.
kafkaPlainAuthPassword
(string), optional, supplies the password for Kafka SASL PLAIN authentication.No default value. Can be updated at runtime without having to rebuild the index.
bulkUpdateBatchSize
(integer), controls the maximum batch size in bytes and corresponds to Kafka producer configbatch.size
.Default value is
1,048,576
(1 megabyte). Can be updated at runtime without having to rebuild the index.
bulkUpdateRequestSize
(integer), controls the maximum request size (and consequently the maximum size per document) in bytes.Default value is
1,048,576
(1 megabyte). Can be updated at runtime without having to rebuild the index.
authenticationConfiguratorClass
optional, provides custom authentication behavior
kafkaCompressionType
(string), sets the compression to use when sending documents to Kafka.One of
none
,gzip
,lz4
,snappy
), the default issnappy
. This corresponds to Kafka producer config propertycompression.type
. Can be updated at runtime without having to rebuild the index.
kafkaProducerId
(string), an optional identifier that allows for separate Kafka producers with different options to the same Kafka broker.No default – all instances to the same Kafka broker will use a shared Kafka producer and thus must have the same options. See also Producer sharing and Conflict resolution.
kafkaProducerConfig
(JSON), optional, the settings for creating the Kafka producer.This option is passed directly to the Kafka producer when it is instantiated. Each key is a Kafka producer configuration property. Some config keys, e.g.,
transactional.id
, are not allowed here. No default. Can be updated at runtime without having to rebuild the index.
kafkaIgnoreDeleteAll
(boolean), optional, a flag that, when selected, will not notify Kafka when all repository statements are removed.GraphDB handles the removal of all statements as a special operation that is manifested as sending a Kafka record with NULL key and NULL value. If this flag is
true
, no such record will be sent.False
by default.
kafkaPropagateConfig
(boolean), optional, a non-persisted flag that, when selected, will force propagating the Kafka config to other connector instances using the same Kafka broker.False
by default. See also Producer sharing and Conflict resolution. Can be updated at runtime without having to rebuild the index.
types
(list of IRIs), required, specifies the types of entities to syncThe RDF types of entities to sync are specified as a list of IRIs. At least one type IRI is required.
Use the pseudo-IRI
$any
to sync entities that have at least one RDF type.Use the pseudo-IRI
$untyped
to sync entities regardless of whether they have any RDF type.
languages
(list of strings), optional, valid languages for literalsRDF data is often multilingual, but only some of the languages represented in the literal values can be mapped. This can be done by specifying a list of language ranges to be matched to the language tags of literals according to RFC 4647, Section 3.3.1. Basic Filtering. In addition, an empty range can be used to include literals that have no language tag. The list of language ranges maps all existing literals that have matching language tags.
fields
(list of field objects), required, defines the mapping from RDF to KafkaThe fields specify exactly which parts of each entity will be synchronized as well as the specific details on the connector side. The field is the smallest synchronization unit and it maps a property chain from GraphDB to a field in Kafka. The fields are specified as a list of field objects. At least one field object is required. Each field object has further keys that specify details.
fieldName
(string), required, the name of the field in KafkaThe name of the field defines the mapping on the connector side. It is specified by the key
fieldName
with a string value. The field name is used as the key in the JSON document that will be sent to Kafka.
fieldNameTransform
(one ofnone
,predicate
, orpredicate.localName
), optional,none
by defaultDefines an optional transformation of the field name. Although
fieldName
is always required, it is ignored iffieldNameTransform
ispredicate
orpredicate.localName
.none
: The field name is supplied via thefieldName
option.predicate
: The field name is equal to the full IRI of the last predicate of the chain, e.g., if the last predicate washttp://www.w3.org/2000/01/rdf-schema#label
, then the field name will behttp://www.w3.org/2000/01/rdf-schema#label
too.predicate.localName
: The field name is the derived from the local name of the IRI of the last predicate of the chain, e.g., if the last predicate washttp://www.w3.org/2000/01/rdf-schema#comment
, then the field name will becomment
.
See Indexing all literals in distinct fields for an example.
propertyChain
(list of IRIs), required, defines the property chain to reach the valueThe property chain defines the mapping on the GraphDB side. A property chain is defined as a sequence of triples where the entity IRI is the subject of the first triple, its object is the subject of the next triple, etc. In this model, a property chain with a single element corresponds to a direct property defined by a single triple. Property chains are specified as a list of IRIs where at least one IRI must be provided.
The IRI of the document will be synchronized as the key in the Kafka record.
See Copy fields for defining multiple fields with the same property chain.
See Multiple property chains per field for defining a field whose values are populated from more than one property chain.
See Indexing language tags for defining a field whose values are populated with the language tags of literals.
See Indexing the IRI of an entity for defining a field whose values are populated with the IRI of the indexed entity.
See Wildcard literal indexing for defining a field whose values are populated with literals regardless of their predicate.
valueFilter
(string), optional, specifies the value filter for the fieldSee also Entity filtering.
documentFilter
(string), optional, specifies the nested document filter for the fieldOnly for fields that define nested documents). See also Entity filtering.
defaultValue
(string), optional, specifies a default value for the fieldThe default value (
defaultValue
) provides means for specifying a default value for the field when the property chain has no matching values in GraphDB. The default value can be a plain literal, a literal with a datatype (xsd:
prefix supported), a literal with language, or a IRI. It has no default value.
indexed
(boolean), optional, defaulttrue
If indexed, a field will be included in the JSON document sent to Kafka.
True
by default.If
true
, this option corresponds to"index" = true
. Iffalse
, it corresponds to"index" = false
.
multivalued
(boolean), optional, defaulttrue
RDF properties and synchronized fields may have more than one value. If
multivalued
is set totrue
, all values will be synchronized to Kafka. If set tofalse
, only a single value will be synchronized.True
by default.
ignoreInvalidValues
(boolean), optional, defaultfalse
Per-field option that controls what happens when a value cannot be converted to the requested (or previously detected) type.
False
by default.Example use: when an invalid date literal like
"2021-02-29"^^xsd:date
(2021 is not a leap year) needs to be indexed as a date, or when an IRI needs to be indexed as a number.Note that some conversions are always valid, for example a literal or an IRI to a string field. When
true
, such values will be skipped with a note in the logs. Whenfalse
, such values will break the transaction.
array
(boolean), optional, defaultfalse
Normally, Kafka creates an array only if more than value is present for a given field. If
array
is set totrue
, Kafka will always create an array even for single values. If set tofalse
, Kafka will create arrays for multiple values only.False
by default.
datatype
(string), optional, the manual datatype overrideBy default, the Kafka GraphDB Connector uses datatype of literal values to determine how they should be mapped to Kafka types. For more information on the supported datatypes, see Datatype mapping.
The mapping can be overridden through the property
"datatype"
, which can be specified per field. The value ofdatatype
can be any of thexsd:
types supported by the automatic mapping or a native Kafka type prefixed bynative:
, e.g., bothxsd:long
andnative:long
map to the long type in Kafka.
objectFields
(objects array), optional, nested object mappingWhen
native:object
is used as a datatype value, provide a mapping for the nested object’s fields. Ifdatatype
is not provided, thennative:object
will be assumed.Nested objects support further nested objects with a limit of five levels of nesting.
startFromParent
(integer), optional, default0
Start processing the property chain from the N-th parent instead of the root of the current nested object. 0 is the root of the current nested object, 1 is the parent of the nested object, 2 is the parent of the parent and so on.
valueFilter
(string), optional, specifies the top-level value filter for the documentSee also Entity filtering.
documentFilter
(string), optional, specifies the top-level document filter for the documentSee also Entity filtering.
Updating parameters at runtime¶
As mentioned above, the following connector parameters can be updated at runtime without having to rebuild the index:
kafkaNode
kafkaSSL
kafkaProducerConfig
kafkaCompressionType
kafkaPlainAuthUsername
kafkaPlainAuthPassword
bulkUpdateBatchSize
bulkUpdateRequestSize
kafkaPropagateConfig
This can be done by executing the following SPARQL update, here with examples for changing the user and password:
PREFIX kafka: <http://www.ontotext.com/connectors/kafka#>
PREFIX kafka-inst: <http://www.ontotext.com/connectors/kafka/instance#>
INSERT DATA {
kafka-inst:my_index kafka:updateConnector '''
{
"kafkaPlainAuthUsername": "foo"
"kafkaPlainAuthPassword": "bar"
}
''' .
}
Special field definitions¶
Nested objects¶
Nested objects are JSON objects that are used as values in the main document or other nested objects (up to five levels of nesting is possible). They are defined with the objectFields option.
Having the following data consisting of children and grandchildren relations:
<urn:John>
a <urn:Person> ;
<urn:name> "John" ;
<urn:gender> <urn:Male> ;
<urn:age> 60 ;
<urn:hasSpouse> <urn:Mary> ;
<urn:hasChild> <urn:Billy> ;
<urn:hasChild> <urn:Annie> .
<urn:Mary>
a <urn:Person> ;
<urn:name> "Mary" ;
<urn:gender> <urn:Female> ;
<urn:age> 58 ;
<urn:hasSpouse> <urn:John> ;
<urn:hasChild> <urn:Billy> .
<urn:Eva>
a <urn:Person> ;
<urn:name> "Eva" ;
<urn:gender> <urn:Female> ;
<urn:age> 45 ;
<urn:hasChild> <urn:Annie> .
<urn:Billy>
a <urn:Person> ;
<urn:name> "Billy" ;
<urn:gender> <urn:Male> ;
<urn:age> 35 ;
<urn:hasChild> <urn:Tylor> ;
<urn:hasChild> <urn:Melody> .
<urn:Annie>
a <urn:Person> ;
<urn:name> "Annie" ;
<urn:gender> <urn:Female> ;
<urn:age> 28 ;
<urn:hasChild> <urn:Sammy> .
<urn:Tylor>
a <urn:Person> ;
<urn:name> "Tylor" ;
<urn:gender> <urn:Male> ;
<urn:age> 5 .
<urn:Melody>
a <urn:Person> ;
<urn:name> "Melody" ;
<urn:gender> <urn:Female> ;
<urn:age> 2 .
<urn:Sammy>
a <urn:Person> ;
<urn:name> "Sammy" ;
<urn:gender> <urn:Male> ;
<urn:age> 10 .
<urn:Male> <urn:label> "male" .
<urn:Female> <urn:label> "female" .
We can create a nested objects index that consists of children and grandchildren with their corresponding fields defining their gender and age:
{
"fields": [
{
"fieldName": "name",
"propertyChain": [
"urn:name"
]
},
{
"fieldName": "age",
"propertyChain": [
"urn:age"
],
"datatype": "xsd:long"
},
{
"fieldName": "hasSpouse",
"propertyChain": [
"urn:hasSpouse"
]
},
{
"fieldName": "gender",
"propertyChain": [
"urn:gender",
"urn:label"
]
},
{
"fieldName": "children",
"propertyChain": [
"urn:hasChild"
],
"datatype": "native:object",
"objectFields": [
{
"fieldName": "id",
"propertyChain": [
"$self"
]
},
{
"fieldName": "name",
"propertyChain": [
"urn:name"
]
},
{
"fieldName": "age",
"propertyChain": [
"urn:age"
],
"datatype": "xsd:long"
},
{
"fieldName": "gender",
"propertyChain": [
"urn:gender",
"urn:label"
]
},
{
"fieldName": "children",
"propertyChain": [
"urn:hasChild"
],
"objectFields": [
{
"fieldName": "id",
"propertyChain": [
"$self"
]
},
{
"fieldName": "name",
"propertyChain": [
"urn:name"
]
},
{
"fieldName": "age",
"propertyChain": [
"urn:age"
],
"datatype": "xsd:long"
}
]
}
]
},
{
"fieldName": "grandChildren",
"valueFilter": "$this -> type in (<urn:Person>)",
"propertyChain": [
"urn:hasChild",
"urn:hasChild"
],
"datatype": "native:object",
"objectFields": [
{
"fieldName": "id",
"propertyChain": [
"$self"
]
},
{
"fieldName": "name",
"propertyChain": [
"urn:name"
]
},
{
"fieldName": "age",
"propertyChain": [
"urn:age"
],
"datatype": "xsd:long"
},
{
"fieldName": "gender",
"propertyChain": [
"urn:gender",
"urn:label"
]
}
]
},
],
"types": [
"urn:Person"
],
"kafkaNode": ...,
"kafkaTopic": ...
}
Copy fields¶
Often, it is convenient to synchronize one and the same data multiple
times with different settings to accommodate for different use cases. The Kafka GraphDB
Connector has explicit support for fields that copy their value from
another field. This is achieved by specifying a single element in the
property chain of the form @otherFieldName
, where otherFieldName
is
another non-copy field. Take the following example:
...
"fields": [
{
"fieldName": "grape",
"facet": false,
"propertyChain": [
"http://www.ontotext.com/example/wine#madeFromGrape",
"http://www.w3.org/2000/01/rdf-schema#label"
]
},
{
"fieldName": "whiteGrape",
"propertyChain": [
"@grape"
]
}
],
"entityFilter": "?whiteGrape -> type = <wine:WhiteGrape>"
...
The snippet creates a field “grape” containing all grapes, and another field “whiteGrape”. Both fields are populated with the same values initially and “whiteGrape” is defined as a copy field that refers to the field “grape”. The field “whiteGrape” is additionally filtered so that only certain grape varieties will be synchronized.
Note
The connector handles copy fields in a more optimal way than specifying a field with exactly the same property chain as another field.
Multiple property chains per field¶
Sometimes, you have to work with data models that define the same concept (in terms of what you want to index in Kafka) with more than one property chain, e.g., the concept of “name” could be defined as a single canonical name, multiple historical names and some unofficial names. If you want to index these together as a single field in Kafka, you can define this as a multiple property chains field.
Fields with multiple property chains are defined as a set of separate virtual fields that will be merged into a single physical field when indexed. Virtual fields are distinguished by the suffix $xyz
, where xyz
is any alphanumeric sequence of convenience.For example, we can define the fields name$1
and name$2
like this:
...
"fields": [
{
"fieldName": "name$1",
"propertyChain": [
"http://www.ontotext.com/example#canonicalName"
],
"fieldName": "name$2",
"propertyChain": [
"http://www.ontotext.com/example#historicalName"
]
...
},
...
The values of the fields name$1
and name$2
will be merged
and synchronized to the field name
in Kafka.
Note
You cannot mix suffixed and unsuffixed fields with the same same,
e.g., if you defined myField$new
and myField$old
, you cannot have
a field called just myField
.
Filters and fields with multiple property chains¶
Filters can be used with fields defined with multiple property chains. Both the physical field values and the individual virtual field values are available:
Physical fields are specified without the suffix, e.g.,
?myField
Virtual fields are specified with the suffix, e.g.,
?myField$2
or?myField$alt
.
Note
Physical fields cannot be combined with parent()
as their values
come from different property chains. If you really need to filter
the same parent level, you can rewrite parent(?myField) in (<urn:x>, <urn:y>)
as parent(?myField$1) in (<urn:x>, <urn:y>) || parent(?myField$2)
in (<urn:x>, <urn:y>) || parent(?myField$3) ...
and surround it with
parentheses if it is a part of a bigger expression.
Indexing language tags¶
The language tag of an RDF literal can be indexed by specifying a property chain, where the last element is
the pseudo-IRI lang()
. The property preceding lang()
must lead to a literal value. For example:
PREFIX kafka: <http://www.ontotext.com/connectors/kafka#>
PREFIX kafka-inst: <http://www.ontotext.com/connectors/kafka/instance#>
INSERT DATA {
kafka-inst:my_index kafka:createConnector '''
{
"kafkaNode": "localhost:9092",
"kafkaTopic": "my_index",
"types": ["http://www.ontotext.com/example#gadget"],
"fields": [
{
"fieldName": "name",
"propertyChain": [
"http://www.ontotext.com/example#name"
]
},
{
"fieldName": "nameLanguage",
"propertyChain": [
"http://www.ontotext.com/example#name",
"lang()"
]
}
],
}
''' .
}
The above connector will index the language tag of each literal value of the property http://www.ontotext.com/example#name
into the field nameLanguage
.
Indexing named graphs¶
The named graph of a given value can be indexed by ending a property chain with the special pseudo-URI graph()
. Indexing the named graph of the value instead of the value itself allows searching by named graph.
PREFIX kafka: <http://www.ontotext.com/connectors/kafka#>
PREFIX kafka-inst: <http://www.ontotext.com/connectors/kafka/instance#>
INSERT DATA {
kafka-inst:my_index kafka:createConnector '''
{
"kafkaNode": "localhost:9092",
"kafkaTopic": "my_index",
"types": ["http://www.ontotext.com/example#gadget"],
"fields": [
{
"fieldName": "name",
"propertyChain": [
"http://www.ontotext.com/example#name"
]
},
{
"fieldName": "nameGraph",
"propertyChain": [
"http://www.ontotext.com/example#name",
"graph()"
]
}
],
}
''' .
}
The above connector will index the named graph of each value of the property http://www.ontotext.com/example#name
into the field nameGraph
.
Wildcard literal indexing¶
In this mode, the last element of a property chain is a wildcard that will match any predicate that leads to a literal value.
Use the special pseudo-IRI $literal
as the last element of the property chain to activate it.
Note
Currently, it really means any literal, including literals with data types.
For example:
{
"fields" : [ {
"propertyChain" : [ "$literal" ],
"fieldName" : "name"
}, {
"propertyChain" : [ "http://example.com/description", "$literal" ],
"fieldName" : "description"
}
...
}
See Indexing all literals for a detailed example.
Indexing the IRI of an entity¶
Sometimes you may need the IRI of each entity (e.g., http://www.ontotext.com/example/wine#Franvino
from our
small example dataset) indexed as a regular field. This can be achieved by specifying a property chain with a single
property referring to the pseudo-IRI $self
. For example:
PREFIX kafka: <http://www.ontotext.com/connectors/kafka#>
PREFIX kafka-inst: <http://www.ontotext.com/connectors/kafka/instance#>
INSERT DATA {
kafka-inst:my_index kafka:createConnector '''
{
"kafkaNode": "localhost:9092",
"kafkaTopic": "my_index",
"types": [
"http://www.ontotext.com/example/wine#Wine"
],
"fields": [
{
"fieldName": "entityId",
"propertyChain": [
"$self"
],
},
{
"fieldName": "grape",
"propertyChain": [
"http://www.ontotext.com/example/wine#madeFromGrape",
"http://www.w3.org/2000/01/rdf-schema#label"
]
},
]
}
''' .
}
The above connector will index the IRI of each wine into the field entityId
.
Note
Note that GraphDB will also use the IRI of each entity as the ID of each document in Kafka,
which is represented by the field id
.
Datatype mapping¶
The Kafka GraphDB Connector maps different types of RDF values to different types of Kafka values according to the basic type of the RDF value (IRI or literal) and the datatype of literals. The auto-detection uses the following mapping:
RDF value |
RDF datatype |
JSON type |
---|---|---|
IRI |
n/a |
|
literal |
any type not explicitly mentioned below |
|
literal |
|
|
literal |
|
|
literal |
|
|
literal |
|
|
literal |
|
|
literal |
|
|
literal |
|
|
literal |
|
|
literal |
|
|
literal |
|
|
Note
For any given field, the automatic mapping uses the first value it sees. This works fine for clean datasets but might lead to problems, if your dataset has non-normalized data, e.g., the first value has no datatype but other values have.
It is therefore recommended to set datatype to a fixed value, e.g., xsd:date
.
Please note that the commonly used xsd:integer
and xsd:decimal
datatypes are not indexed as numbers because they represent infinite precision numbers.
You can override that by using the datatype option to cast to xsd:long, xsd:double, xsd:float
as appropriate.
Date and time conversion¶
RDF and ISO use slightly different models for representing dates and times, even though the values might look very similar.
Years in RDF values use the XSD format and are era years, where positive values denote the common era and negative values denote years before the common era. There is no year zero.
Years in the ISO format are proleptic years, i.e., positive values denote years from the common era with any previous eras just going down by one mathematically so there is year zero.
In short:
year 2020 CE = year 2020 in XSD = year 2020 in ISO.
…
year 1 CE = year 1 in XSD = year 1 in ISO.
year 1 BCE = year -1 in XSD = year 0 in ISO.
year 2 BCE = year -2 in XSD = year -1 in ISO.
…
All years coming from RDF literals will be converted to ISO before sending to Kafka.
Both XSD and ISO date and time values support timezones. In addition to that, XSD defines the lack of a timezone as undetermined. Since we do not want to have any undetermined state in the indexing system, we define the undetermined time zone as UTC, i.e., "2020-02-14T12:00:00"^^xsd:dateTime
is equivalent to "2020-02-14T12:00:00Z"^^xsd:dateTime
(Z is the UTC timezone, also known as +00:00).
Also note that XSD dates and partial dates, e.g., xsd:gYear
values, may have a timezone, which leads to additional complications. E.g., "2020+02:00"^^xsd:gYear
(the year 2020 in the +02:00 timezone) will be normalized to 2019-12-31T22:00:00Z
(the previous year!) if strict timezone adherence is followed. We have chosen to ignore the timezone on any values that do not have an associated time value, e.g.:
"2020-02-15+02:00"^^xsd:date
"2020-02+02:00"^^xsd:gYearMonth
"2020+02:00"^^xsd:gYear
All of the above will be treated as if they specified UTC as their timezone.
Entity filtering¶
The Kafka connector supports four kinds of entity filters used to fine-tune the set of entities
and/or individual values for the configured fields, based on the field
value. Entities and field values are synchronized to Kafka if,
and only if, they pass the filter. The filters are similar to a
FILTER()
inside a SPARQL query but not exactly the same. In them, each configured
field can be referred to by prefixing it with a
?
, much like referring to a variable in SPARQL.
Types of filters¶
- Top-level value filter
The top-level value filter is specified via
valueFilter
. It is evaluated prior to anything else when only the document ID is known and it may not refer to any field names but only to the special field$this
that contains the current document ID. Failing to pass this filter removes the entire document early in the indexing process and it can be used to introduce more restrictions similar to the built-in filtering by type via thetypes
property.- Top-level document filter
The top-level document filter is specified via
documentFilter
. This filter is evaluated last when all of the document has been collected and it decides whether to include the document in the index. It can be used to enforce global document restrictions, e.g., certain fields are required or a document needs to be indexed only if a certain field value meets specific conditions.- Per-field value filter
The per-field value filter is specified via
valueFilter
inside the field definition of the field whose values are to be filtered. The filter is evaluated while collecting the data for the field when each field value becomes available.The variable that contains the field value is
$this
. Other field names can be used to filter the current field’s value based on the value of another field, e.g.,$this > ?age
will compare the current field value to the value of the field age (see also Two-variable filtering). Failing to pass the filter will remove the current field value.On nested documents, the per-field value filter can be used to remove the entire nested document early in the indexing process, e.g., by checking the type of the nested document via next hop with
rdf:type
.- Nested document filter
The nested document filter is specified via
documentFilter
inside the field definition of the field that defines the root of a nested document. The filter is evaluated after the entire nested document has been collected. Failing to pass this filter removes the entire nested document.Inside a nested document filter, the field names are within the context of the nested document and not within the context of the top-level document. For example, if we have a field
children
that defines a nested document, and we use a filter like?age < "10"^^xsd:int
, we will be referring to the fieldchildren.age
. We can use the prefix$outer.
one or more times to refer to field values from the outer document (from the viewpoint of the nested document). For example,$outer.age > "25"^^xsd:int
will refer to theage
field that is a sibling of thechildren
field.Other than the above differences, the nested document filter is equivalent to the top-level document filter from the viewpoint of the nested document.
See also Migrating from GraphDB 9.x.
Filter operators¶
The filter operators are used to test if the value of a given field satisfies a certain condition.
Field comparisons are done on original RDF values before they are converted to Kafka values using datatype mapping.
Operator |
Meaning |
---|---|
|
Tests if the field Example:
?status in ("active", "new") |
|
The negated version of the in-operator. Example:
?status not in ("archived") |
|
Tests if the field Example:
bound(?name) |
|
Tests if the field Example:
isExplicit(?name) |
?var = value (equal to)?var != value (not equal to)?var > value (greater than)?var >= value (greater than or
equal to)?var < value (less than)?var <= value (less than or
equal to) |
RDF value comparison operators that compare RDF values similarly to the equivalent SPARQL operators.
The field
var ’s value will be compared to the specified RDF value. When comparing RDF values that are
literals, their datatypes must be compatible, e.g., xsd:integer and xsd:long but not xsd:string
and xsd:date . Values that do not match are treated as if they were not present in the repository.Examples:
Given that
height ’s value is "150"^^xsd:int and dateOfBirth ’s value is "1989-12-31"^^xsd:date ,
then:?height = "150"^^xsd:int is true ?height = "150"^^xsd:long is true ?height = "150" is false ?height != "151"^^xsd:int is true ?height != "150" is true ?height > "150"^^xsd:int is false ?height >= "150"^^xsd:int is true ?dateOfBirth < "1990-01-01"^^xsd:date is true |
|
Tests if the field
var ’s value matches the given regular expression pattern.If the “i” flag option is present, this indicates that the match operates in case-insensitive mode.
Values that do not match are treated as if they were not present in the repository.
Example:
regex(?name, "^mrs?", "i") |
|
Logical disjunction of expressions Examples:
bound(?name) || bound(?company) bound(?name) or bound(?company) |
|
Logical conjunction of expressions Examples:
bound(?status) && ?status in ("active", "new") bound(?status) and ?status in ("active", "new") |
|
Logical negation of expression Example:
!bound(?company) |
|
Grouping of expressions Example:
(bound(?name) or bound(?company)) && bound(?address) |
Filter modifiers¶
In addition to the operators, there are some constructions that can be used to write filters based not on the values of a field but on values related to them:
- Accessing the previous element in the chain
The construction
parent(?var)
is used for going to a previous level in a property chain. It can be applied recursively as many times as needed, e.g.,parent(parent(parent(?var)))
goes back in the chain three times. The effective value ofparent(?var)
can be used with thein
ornot in
operator like this:parent(?company) in (<urn:a>, <urn:b>)
, or in thebound
operator like this:parent(bound(?var))
.
- Accessing an element beyond the chain
The construction
?var -> uri
(alternatively,?var o uri
or just?var uri
) is used to access additional values that are accessible through the propertyuri
. In essence, this construction corresponds to the triple patternvalue
uri
?effectiveValue
, where?value
is a value bound by the fieldvar
. The effective value of?var -> uri
can be used with thein
ornot in
operator like this:?company -> rdf:type in (<urn:c>, <urn:d>)
. It can be combined withparent()
like this:parent(?company) -> rdf:type in (<urn:c>, <urn:d>)
. The same construction can be applied to thebound
operator like this:bound(?company -> <urn:hasBranch>)
, or even combined withparent()
like this:bound(parent(?company) -> <urn:hasGroup>)
.The IRI parameter can be a full IRI within
< >
or the special stringrdf:type
(alternatively, justtype
), which will be expanded tohttp://www.w3.org/1999/02/22-rdf-syntax-ns#type
.
- Filtering by RDF graph
The construction
graph(?var)
is used for accessing the RDF graph of a field’s value. A typical use case is to sync only explicit values:graph(?a) not in (<http://www.ontotext.com/implicit>)
but usingisExplicit(?a)
is the recommended way.The construction can be combined with
parent()
like this:graph(parent(?a)) in (<urn:a>)
.
- Filtering by language tags
The construction
lang(?var)
is used for accessing the language tag of field’s value (only RDF literals can have a language tag). The typical use case is to sync only values written in a given language:lang(?a) in ("de", "it", "no")
. The construction can be combined withparent()
and an element beyond the chain like this:lang(parent(?a) -> <http://www.w3.org/2000/01/rdf-schema#label>) in ("en", "bg")
. Literal values without language tags can be filtered by using an empty tag:""
.
- Current context variable
$this
The special field variable
$this
(and not?this
,?$this
,$?this
) is used to refer to the current context. In the top-level value filter and the top-level document filter, it refers to the document. In the per-field value filter, it refers to the currently filtered field value. In the nested document filter, it refers to the nested document.
ALL()
quantifierIn the context of document-level filtering, a match is
true
if at least one of potentially many field values match, e.g.,?location = <urn:Europe>
would returntrue
if the document contains{ "location": ["<urn:Asia>", "<urn:Europe>"] }
.In addition to this, you can also use the ALL() quantifier when you need all values to match, e.g.,
ALL(?location) = <urn:Europe>
would not match with the above document because<urn:Asia>
does not match.
- Entity filters and default values
Entity filters can be combined with default values in order to get more flexible behavior.
If a field has no values in the RDF database, the
defaultValue
is used. But if a field has some values,defaultValue
is NOT used, even if all values are filtered out. See an example in Basic entity filter.A typical use-case for an entity filter is having soft deletes, i.e., instead of deleting an entity, it is marked as deleted by the presence of a specific value for a given property.
Two-variable filtering¶
Besides comparing a field value to one or more constants or running an existential check on the field value, some use cases also require comparing the field value to the value of another field in order to produce the desired result. GraphDB solves this by supporting two-variable filtering in the per-field value filter, the top-level document filter, and the nested document filter.
Note
This type of filtering is not possible in the top-level value filter because the only variable that is available there is $this
.
In the top-level document filter and the nested document filter, there are no restrictions as all values are available at the time of evaluation.
In the per-field value filter, two-variable filtering will reorder the defined fields such that values for other fields are already available when the current field’s filter is evaluated. For example, let’s say we defined a filter $this > ?salary
for the field price
. This will force the connector to process the field salary
first, apply its per-field value filter if any, and only then start collecting and filtering the values for the field price
.
Cyclic dependencies will be detected and reported as an invalid filter. For example, if in addition to the above we define a per-field value filter ?price > "1000"^^xsd:int
for the field salary
, a cyclic dependency will be detected as both price
and salary
will require the other field being indexed first.
Basic entity filter example¶
Given the following RDF data:
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix example: <http://www.ontotext.com/example#> .
# the entity below will be synchronised because it has a matching value for city: ?city in ("London")
example:alpha
rdf:type example:gadget ;
example:name "John Synced" ;
example:city "London" .
# the entity below will not be synchronised because it lacks the property completely: bound(?city)
example:beta
rdf:type example:gadget ;
example:name "Peter Syncfree" .
# the entity below will not be synchronized because it has a different city value:
# ?city in ("London") will remove the value "Liverpool" so bound(?city) will be false
example:gamma
rdf:type example:gadget ;
example:name "Mary Syncless" ;
example:city "Liverpool" .
If you create a connector instance such as:
PREFIX kafka: <http://www.ontotext.com/connectors/kafka#>
PREFIX kafka-inst: <http://www.ontotext.com/connectors/kafka/instance#>
INSERT DATA {
kafka-inst:my_index kafka:createConnector '''
{
"kafkaNode": "localhost:9092",
"kafkaTopic": "my_index",
"types": ["http://www.ontotext.com/example#gadget"],
"fields": [
{
"fieldName": "name",
"propertyChain": ["http://www.ontotext.com/example#name"]
},
{
"fieldName": "city",
"propertyChain": ["http://www.ontotext.com/example#city"],
"valueFilter": "$this = \\"London\\""
}
],
"documentFilter": "bound(?city)"
}
''' .
}
The entity :beta
is not synchronized as it has no value for city
.
To handle such cases, you can modify the connector configuration to
specify a default value for city
:
...
{
"fieldName": "city",
"propertyChain": ["http://www.ontotext.com/example#city"],
"defaultValue": "London"
}
...
}
The default value is used for the entity :beta
as it has no value for city
in the repository. As the value is “London”, the entity is synchronized.
Advanced entity filter example¶
Sometimes, data represented in RDF is not well suited to map directly to
non-RDF. For example, if you have news articles and they can be tagged
with different concepts (locations, persons, events, etc.), one possible
way to model this is a single property :taggedWith
. Consider the
following RDF data:
@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix example2: <http://www.ontotext.com/example2#> .
example2:Berlin
rdf:type example2:Location ;
rdfs:label "Berlin" .
example2:Mozart
rdf:type example2:Person ;
rdfs:label "Wolfgang Amadeus Mozart" .
example2:Einstein
rdf:type example2:Person ;
rdfs:label "Albert Einstein" .
example2:Cannes-FF
rdf:type example2:Event ;
rdfs:label "Cannes Film Festival" .
example2:Article1
rdf:type example2:Article ;
rdfs:comment "An article about a film about Einstein's life while he was a professor in Berlin." ;
example2:taggedWith example2:Berlin ;
example2:taggedWith example2:Einstein ;
example2:taggedWith example2:Cannes-FF .
example2:Article2
rdf:type example2:Article ;
rdfs:comment "An article about Berlin." ;
example2:taggedWith example2:Berlin .
example2:Article3
rdf:type example2:Article ;
rdfs:comment "An article about Mozart's life." ;
example2:taggedWith example2:Mozart .
example2:Article4
rdf:type example2:Article ;
rdfs:comment "An article about classical music in Berlin." ;
example2:taggedWith example2:Berlin ;
example2:taggedWith example2:Mozart .
example2:Article5
rdf:type example2:Article ;
rdfs:comment "A boring article that has no tags." .
example2:Article6
rdf:type example2:Article ;
rdfs:comment "An article about the Cannes Film Festival in 2013." ;
example2:taggedWith example2:Cannes-FF .
Assume you want to map this data to Kafka, so that the property example2:taggedWith x
is mapped to separate fields taggedWithPerson
and taggedWithLocation
, according to the type of x
(whereas we are not interested in Events). You can map taggedWith
twice to different fields
and then use an entity filter to get the desired values:
PREFIX kafka: <http://www.ontotext.com/connectors/kafka#>
PREFIX kafka-inst: <http://www.ontotext.com/connectors/kafka/instance#>
INSERT DATA {
kafka-inst:my_index kafka:createConnector '''
{
"kafkaNode": "localhost:9092",
"kafkaTopic": "my_index",
"types": ["http://www.ontotext.com/example2#Article"],
"fields": [
{
"fieldName": "comment",
"propertyChain": ["http://www.w3.org/2000/01/rdf-schema#comment"]
},
{
"fieldName": "taggedWithPerson",
"propertyChain": ["http://www.ontotext.com/example2#taggedWith"],
"valueFilter": "$this -> type = <http://www.ontotext.com/example2#Person>"
},
{
"fieldName": "taggedWithLocation",
"propertyChain": ["http://www.ontotext.com/example2#taggedWith"],
"valueFilter": "$this -> type = <http://www.ontotext.com/example2#Location>"
}
]
}
''' .
}
Note
type
is the short way to write <http://www.w3.org/1999/02/22-rdf-syntax-ns#type>
.
The six articles in the RDF data above will be mapped as such:
Article IRI |
Value in taggedWithPerson |
Value in taggedWithLocation |
Explanation |
---|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
||
|
|
Overview of connector predicates¶
The following diagram shows a summary of all predicates that can administrate (create, drop, check status) connector instances or issue queries and retrieve results. It can be used as a quick reference of what a particular predicate needs to be attached to. Variables that are bound as a result of a query are shown in green, blank helper nodes are shown in blue, literals in red, and IRIs in orange. The predicates are represented by labeled arrows.
Caveats¶
Producer sharing¶
The Kafka connector aims to minimize resource usage and provide smooth transactional operation. This is achieved by using a single Kafka producer object for each connector instance that is connected to the same Kafka broker node. This has the following benefits:
Memory consumption is reduced as each Kafka producer requires a certain amount of buffer memory.
A failed transaction in one Kafka connector instance will be reverted in all other Kafka connector instances together with the GraphDB transaction.
Due to the nature of Kafka producers it imposes a restriction as well:
All connector instances must use the same Kafka options, e.g., they must have the same values for the
bulkUpdateBatchSize
andkafkaCompressionType
options.
Once you have created at least one Kafka connector instance and attempt to create another instance, the following are possible scenarios:
- Different Kafka broker
The new connector instance specifies a different Kafka broker.
The connector instance will be created and a new Kafka producer will be instantiated.
- Same Kafka broker + same Kafka options
The new connector instance specifies the same Kafka broker as one of the existing connectors and the SAME options as the existing connector.
The connector instance will be created and the existing Kafka producer will be reused
- Same Kafka broker + different Kafka options
The new connector instance specifies the same Kafka broker as one of the existing connectors and DIFFERENT options than the existing connector.
The connector instance will NOT be created and an error explaining the reason will be thrown.
See Conflict resolution for possible workarounds.
Note
The Kafka broker for two connector instances is considered to be the same if at least one of the host/port
pairs supplied via the kafkaNode
option is the same.
Conflict resolution¶
When the attempt to create a new Kafka connector instance was denied because another instance was already created with different options, there are several possible ways to resolve the conflict:
- Manual resolution
Examine the options of the new connector instance you want to create.
Make the options the same as of the existing connector instance.
- Propagate the new options to the existing instances
Set the option kafkaPropagateConfig of the new instance to
true
.The new options will be propagated to all existing instances that share the same Kafka broker node.
- Force the allocation of a new producer
Set the option kafkaProducerId of the new instance to some non-empty identifier.
This will override the producer sharing mechanism and allocate a new producer associating it with the supplied producer ID.
The new connector will use the new options.
All existing instances will continue using their previous options.
Upgrading from previous versions¶
Migrating from GraphDB 9.x¶
GraphDB 10.0 introduces major changes to the filtering mechanism of the connectors. Existing connector instances will not be usable and attempting to use them for queries or updates will throw an error.
If your GraphDB 9.x (or older) connector definitions do not include an entity filter, you can simply repair them.
If your GraphDB 9.x (or older) connector definitions do include an entity filter with the entityFilter
option, you need to rewrite the filter with one of the current filter types:
Save your existing connector definition.
Drop the connector instance.
In general, most older connector filters can be easily rewritten using the per-field value filter and top-level document filter. Rewrite the filters as follows:
Rule of thumb:
If you want to remove individual values, i.e., if the operand is not
BOUND()
–-> rewrite with per-field value filter.If you want to remove entire documents, i.e., if the operand is
BOUND()
–> rewrite with top-level document filter.
So if we take the example:
?location = <urn:Europe> AND BOUND(?location) AND ?type IN (<urn:Foo>, <urn:Bar>)
It needs to be rewritten like this:
Per-field rule on field
location
:$this = <urn:Europe>
Per-field rule on field
type
:$this IN (<urn:Foo>, <urn:Bar>)
Top-level document filter:
BOUND(?location)
Recreate the connector instance using the new definition.