Source Configuration Options
Reference of the source connector options.
Connection
couchbase.seed.nodes
Addresses of Couchbase Server nodes, delimited by commas.
If a custom port is specified, it must be the KV port (which is normally 11210 for insecure connections, or 11207 for secure connections).
-
Type: list
-
Importance: high
couchbase.password
Password of the Couchbase user.
May be overridden with the KAFKA_COUCHBASE_PASSWORD environment variable.
-
Type: password
-
Importance: high
couchbase.bucket
Name of the Couchbase bucket to use.
This property is required unless using the experimental AnalyticsSinkHandler.
-
Type: string
-
Default:
""
-
Importance: high
couchbase.network
The network selection strategy for connecting to a Couchbase Server cluster that advertises alternate addresses.
A Couchbase node running inside a container environment (like Docker or Kubernetes) might be configured to advertise both its address within the container environment (known as its "default" address) as well as an "external" address for use by clients connecting from outside the environment.
Setting the 'couchbase.network' config property to 'default' or 'external' forces the selection of the respective addresses. Setting the value to 'auto' tells the connector to select whichever network contains the addresses specified in the 'couchbase.seed.nodes' config property.
-
Type: string
-
Default:
auto
-
Importance: medium
couchbase.bootstrap.timeout
On startup, the connector will wait this long for a Couchbase connection to be established. If a connection is not established before the timeout expires, the connector will terminate.
-
Type: string
-
Default:
30s
-
Valid Values: An integer followed by a time unit (ms = milliseconds, s = seconds, m = minutes, h = hours, d = days). For example, to specify 30 minutes: 30m
-
Importance: medium
Security
couchbase.enable.tls
Use secure connection to Couchbase Server.
If true, you must also tell the connector which certificate to trust. Specify a certificate file with 'couchbase.trust.certificate.path', or a Java keystore file with 'couchbase.trust.store.path' and 'couchbase.trust.store.password'.
-
Type: boolean
-
Default:
false
-
Importance: medium
-
Dependents:
,couchbase.trust.certificate.path
,couchbase.trust.store.path
,couchbase.trust.store.password
,couchbase.enable.hostname.verification
,couchbase.client.certificate.path
couchbase.client.certificate.password
couchbase.enable.hostname.verification
Set this to false
to disable TLS hostname verification for Couchbase connections.
Less secure, but might be required if for some reason you can’t issue a certificate whose Subject Alternative Names match the hostname used to connect to the server.
Only disable if you understand the impact and can accept the risks.
-
Type: boolean
-
Default:
true
-
Importance: medium
couchbase.trust.store.path
Absolute filesystem path to the Java keystore holding the CA certificate used by Couchbase Server.
If you want to use a PEM file instead of a Java keystore, specify couchbase.trust.certificate.path
instead.
-
Type: string
-
Default:
""
-
Importance: medium
couchbase.trust.store.password
Password for accessing the trust store.
May be overridden with the KAFKA_COUCHBASE_TRUST_STORE_PASSWORD environment variable.
-
Type: password
-
Default:
[hidden]
-
Importance: medium
couchbase.trust.certificate.path
Absolute filesystem path to the PEM file containing the CA certificate used by Couchbase Server.
If you want to use a Java keystore instead of a PEM file, specify couchbase.trust.store.path
instead.
-
Type: string
-
Default:
""
-
Importance: medium
couchbase.client.certificate.path
Absolute filesystem path to a Java keystore or PKCS12 bundle holding the private key and certificate chain to use for client certificate authentication (mutual TLS).
If you supply a value for this config property, the couchbase.username
and couchbase.password
properties will be ignored.
-
Type: string
-
Default:
""
-
Importance: medium
Logging
couchbase.log.redaction
Determines which kinds of sensitive log messages from the Couchbase connector will be tagged for later redaction by the Couchbase log redaction tool. NONE = no tagging; PARTIAL = user data is tagged; FULL = user, meta, and system data is tagged.
-
Type: string
-
Default:
NONE
-
Valid Values: One of [NONE, PARTIAL, FULL]
-
Importance: medium
couchbase.log.document.lifecycle
If true, document lifecycle milestones will be logged at INFO level instead of DEBUG. Enabling this feature lets you watch documents flow through the connector. Disabled by default because it generates many log messages.
-
Type: boolean
-
Default:
false
-
Importance: medium
couchbase.metrics.interval
The connector writes metrics to the log at this interval.
Disable metric logging by setting this to 0
.
UNCOMMITTED; this feature may change in a patch release without notice.
-
Since: 4.2.3
-
Type: string
-
Default:
10m
-
Valid Values: An integer followed by a time unit (ms = milliseconds, s = seconds, m = minutes, h = hours, d = days). For example, to specify 30 minutes: 30m
-
Importance: medium
Source Behavior
couchbase.topic
Name of the default Kafka topic to publish data to, for collections that don’t have an entry in the couchbase.collection.to.topic
map.
This is a format string that recognizes the following placeholders:
${bucket} refers to the bucket containing the document.
${scope} refers to the scope containing the document.
${collection} refers to the collection containing the document.
-
Type: string
-
Default:
${bucket}.${scope}.${collection}
-
Importance: medium
couchbase.collection.to.topic
A map from Couchbase collection to Kafka topic.
Collection and Topic are joined by an equals sign. Map entries are delimited by commas.
For example, if you want to write messages from collection "scope-a.invoices" to topic "topic1", and messages from collection "scope-a.widgets" to topic "topic2", you would write: "scope-a.invoices=topic1,scope-a.widgets=topic2".
Defaults to an empty map.
For collections not present in this map, the destination topic is determined by the couchbase.topic
config property.
-
Since: 4.1.8
-
Type: list
-
Default:
""
-
Valid Values: scope.collection=topic,…
-
Importance: medium
couchbase.source.handler
The fully-qualified class name of the source handler to use. The source handler determines how the Couchbase document is converted into a Kafka record.
To publish JSON messages identical to the Couchbase documents, use com.couchbase.connect.kafka.handler.source.RawJsonSourceHandler
and set value.converter
to org.apache.kafka.connect.converters.ByteArrayConverter
.
When using a custom source handler that filters out certain messages, consider also configuring couchbase.black.hole.topic
.
See that property’s documentation for details.
-
Type: class
-
Importance: medium
couchbase.headers
Comma-delimited list of Couchbase metadata headers to add to records. Recognized values:
-
bucket
- Name of the bucket the document came from. -
scope
- Name of the scope the document came from. -
collection
- Name of the collection the document came from. -
key
- The Couchbase document ID. -
qualifiedKey
- The document’s scope, collection, and document ID, delimited by dots. Example:myScope.myCollection.myDocumentId
-
cas
- The document’s "compare and swap" value. -
partition
- The index of the Couchbase partition the document came from. -
partitionUuid
- Identifies the history branch of the partition the document came from. -
seqno
- The DCP sequence number of the event. -
rev
- The revision number of the event. -
expiry
- The epoch second when the document expires, or null if the document has no expiry (or if the event is a deletion).
UNCOMMITTED; this feature may change in a patch release without notice.
-
Since: 4.2.5
-
Type: list
-
Default:
""
-
Valid Values: Zero or more of [bucket, scope, collection, key, qualifiedKey, cas, partition, partitionUuid, seqno, rev, expiry]
-
Importance: medium
couchbase.header.name.prefix
The connector prepends this value to header names to prevent collision with headers set by other parts of the system.
For example, if couchbase.headers
is set to bucket,qualifiedKey
and header.name.prefix
is set to example.
then records will have headers named example.bucket
and example.qualifiedKey
.
UNCOMMITTED; this feature may change in a patch release without notice.
-
Since: 4.2.5
-
Type: string
-
Default:
couchbase.
-
Importance: medium
couchbase.event.filter
The class name of the event filter to use. The event filter determines whether a database change event is ignored.
As of version 4.2.4, the default filter ignores events from the Couchbase _system
scope.
If you are interested in those events too, set this property to com.couchbase.connect.kafka.filter.AllPassIncludingSystemFilter
.
See also couchbase.black.hole.topic
.
-
Type: class
-
Default:
com.couchbase.connect.kafka.filter.AllPassFilter
-
Importance: medium
couchbase.black.hole.topic
If this property is non-blank, the connector publishes a tiny synthetic record to this topic whenever the Filter or SourceHandler ignores a source event.
This lets the connector tell the Kafka Connect framework about the source offset of the ignored event. Otherwise, a long sequence of ignored events in a low-traffic deployment might cause the stored source offset to lag too far behind the current source offset, which can lead to rollbacks to zero when the connector is restarted.
After a record is published to this topic, the record is no longer important, and should be deleted as soon as possible. To reduce disk usage, configure this topic to use small segments and the lowest possible retention settings.
-
Since: 4.1.8
-
Type: string
-
Default:
""
-
Importance: medium
couchbase.initial.offset.topic
If couchbase.stream.from
is SAVED_OFFSET_OR_NOW
, and this property is non-blank, on startup the connector publishes to the named topic one tiny synthetic record for each source partition that does not yet have a saved offset.
This lets the connector initialize the missing source offsets to "now" (the current state of Couchbase).
The synthetic records have a value of null, and the same key: COUCHBASE_INITIAL_OFFSET_TOMBSTONEa54ee32b-4a7e-4d98-aa36-45d8417e942a
.
Consumers of this topic must ignore (or tolerate) these records.
If you specify a value for couchbase.black.hole.topic
, specify the same value here.
UNCOMMITTED; this feature may change in a patch release without notice.
-
Since: 4.2.4
-
Type: string
-
Default:
""
-
Importance: medium
couchbase.batch.size.max
Controls maximum size of the batch for writing into topic.
-
Type: int
-
Default:
2000
-
Importance: medium
couchbase.no.value
If true, Couchbase Server will omit the document content when telling the connector about a change. The document key and metadata will still be present.
If you don’t care about the content of changed documents, enabling this option is a great way to reduce the connector’s network bandwidth and memory usage.
-
Type: boolean
-
Default:
false
-
Importance: medium
couchbase.connector.name.in.offsets
When true, the connector’s offsets are saved under a key that includes the connector name. This is redundant, since the Kafka Connect framework already isolates the offsets of connectors with different names.
Set this to true only if you’ve previously deployed the connector to production with this set to true, and you do not wish to restart streaming from the beginning. Otherwise you should ignore this property.
-
Type: boolean
-
Default:
false
-
Importance: medium
couchbase.stream.from
Controls when in the history the connector starts streaming from.
-
Type: string
-
Default:
SAVED_OFFSET_OR_BEGINNING
-
Valid Values: One of [SAVED_OFFSET_OR_BEGINNING, SAVED_OFFSET_OR_NOW, BEGINNING, NOW]
-
Importance: medium
couchbase.scope
If you wish to stream from all collections within a scope, specify the scope name here.
If you specify neither "couchbase.scope" nor "couchbase.collections", the connector will stream from all collections of all scopes in the bucket.
Requires Couchbase Server 7.0 or later.
-
Type: string
-
Default:
""
-
Importance: medium
couchbase.collections
If you wish to stream from specific collections, specify the qualified collection names here, separated by commas. A qualified name is the name of the scope followed by a dot (.) and then the name of the collection. For example: "tenant-foo.invoices".
If you specify neither "couchbase.scope" nor "couchbase.collections", the connector will stream from all collections of all scopes in the bucket.
Requires Couchbase Server 7.0 or later.
-
Type: list
-
Default:
""
-
Importance: medium
Database Change Protocol
couchbase.compression
To reduce bandwidth usage, Couchbase Server 5.5 and later can send documents to the connector in compressed form. (Messages are always published to the Kafka topic in uncompressed form, regardless of this setting.)
-
Type: string
-
Default:
ENABLED
-
Valid Values: One of [DISABLED, FORCED, ENABLED]
-
Importance: medium
couchbase.persistence.polling.interval
When a Couchbase Server node fails over, documents on the failing node that haven’t been fully replicated may be "rolled back" to a previous state. To ensure consistency between Couchbase and the Kafka topic, the connector can defer publishing a document to Kafka until it has been saved to disk on all replicas.
To enable this feature, specify a non-zero persistence polling interval. The interval is how frequently the connector asks each Couchbase node which changes have been fully replicated and persisted. This ensures consistency between Couchbase and Kafka, at the cost of additional latency and bandwidth usage.
To disable this feature, specify a zero duration (0
).
In this mode the connector publishes changes to Kafka immediately, without waiting for replication.
This is fast and uses less network bandwidth, but can result in publishing "phantom changes" that don’t reflect the actual state of a document in Couchbase after a failover.
Documents written to Couchbase with enhanced durability are never published to Kafka until the durability requirements are met, regardless of whether persistence polling is enabled. |
When connecting to an ephemeral bucket, always disable persistence polling by setting this config option to 0 , otherwise the connector will never publish any changes.
|
-
Type: string
-
Default:
100ms
-
Valid Values: An integer followed by a time unit (ms = milliseconds, s = seconds, m = minutes, h = hours, d = days). For example, to specify 30 minutes: 30m
-
Importance: medium
couchbase.flow.control.buffer
The flow control buffer limits how much data Couchbase will send before waiting for the connector to acknowledge the data has been processed. The recommended size is between 10 MiB ("10m") and 50 MiB ("50m").
Make sure to allocate enough memory to the Kafka Connect worker process to accommodate the flow control buffer, otherwise the connector might run out of memory under heavy load. Read on for details. |
There’s a separate buffer for each node in the Couchbase cluster. When calculating how much memory to allocate to the Kafka Connect worker, multiply the flow control buffer size by the number of Couchbase nodes, then multiply by 2. This is how much memory a single connector task requires for the flow control buffer (not counting the connector’s baseline memory usage).
-
Type: string
-
Default:
16m
-
Valid Values: An integer followed by a size unit (b = bytes, k = kilobytes, m = megabytes, g = gigabytes). For example, to specify 64 megabytes: 64m
-
Importance: medium
couchbase.xattrs
Should filters and source handlers have access to a document’s extended attributes?
-
Type: boolean
-
Default:
false
-
Importance: medium
couchbase.enable.dcp.trace
If true, detailed protocol trace information is logged to the com.couchbase.client.dcp.trace
category at INFO level.
Otherwise, trace information is not logged.
Disabled by default because it generates many log messages.
-
Since: 4.1.6
-
Type: boolean
-
Default:
false
-
Importance: medium
-
Dependents:
couchbase.dcp.trace.document.id.regex
couchbase.dcp.trace.document.id.regex
When DCP trace is enabled, set this property to limit the trace to only documents whose IDs match this Java regular expression.
Ignored if couchbase.enable.dcp.trace
is false.
-
Since: 4.1.6
-
Type: string
-
Default:
.*
-
Importance: medium
Parent topic: Kafka Connector
Previous topic: Quickstart
Next topic: Sink Configuration Options