Skip to main content

Kafka

Extract Topics & Schemas from Apache Kafka or Confluent Cloud.

Module kafka

Certified

Important Capabilities

CapabilityStatusNotes
Platform InstanceFor multiple Kafka clusters, use the platform_instance configuration
Schema MetadataSchemas associated with each topic are extracted from the schema registry. Avro and Protobuf (certified), JSON (incubating). Schema references are supported.

This plugin extracts the following:

  • Topics from the Kafka broker
  • Schemas associated with each topic from the schema registry (Avro, Protobuf and JSON schemas are supported)

CLI based Ingestion

Install the Plugin

pip install 'acryl-datahub[kafka]'

Starter Recipe

Check out the following recipe to get started with ingestion! See below for full configuration options.

For general pointers on writing and running a recipe, see our main recipe guide.

source:
type: "kafka"
config:
platform_instance: "YOUR_CLUSTER_ID"
connection:
bootstrap: "broker:9092"
schema_registry_url: http://localhost:8081

sink:
# sink configs


Config Details

Note that a . is used to denote nested fields in the YAML recipe.

View All Configuration Options
Field [Required]TypeDescriptionDefaultNotes
ignore_warnings_on_schema_typebooleanDisables warnings reported for non-AVRO/Protobuf value or key schemas if set.None
platform_instancestringThe instance of the platform that all assets produced by this recipe belong toNone
schema_registry_classstringThe fully qualified implementation class(custom) that implements the KafkaSchemaRegistryBase interface.datahub.ingestion.source.confluent_schema_registry.ConfluentSchemaRegistry
topic_subject_mapmap(str,string)None
envstringThe environment that all assets produced by this connector belong toPROD
connectionKafkaConsumerConnectionConfig{'bootstrap': 'localhost:9092', 'schema_registry_url': 'http://localhost:8081', 'schema_registry_config': {}, 'client_timeout_seconds': 60, 'consumer_config': {}}
connection.bootstrapstringlocalhost:9092
connection.client_timeout_secondsintegerThe request timeout used when interacting with the Kafka APIs.60
connection.consumer_configobjectExtra consumer config serialized as JSON. These options will be passed into Kafka's DeserializingConsumer. See https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#deserializingconsumer and https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md .None
connection.schema_registry_configobjectExtra schema registry config serialized as JSON. These options will be passed into Kafka's SchemaRegistryClient. https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html?#schemaregistryclientNone
connection.schema_registry_urlstringhttp://localhost:8081
domainmap(str,AllowDenyPattern)A class to store allow deny regexesNone
domain.key.allowarray(string)None
domain.key.denyarray(string)None
domain.key.ignoreCasebooleanWhether to ignore case sensitivity during pattern matching.True
topic_patternsAllowDenyPattern{'allow': ['.*'], 'deny': ['^_.*'], 'ignoreCase': True}
topic_patterns.allowarray(string)None
topic_patterns.denyarray(string)None
topic_patterns.ignoreCasebooleanWhether to ignore case sensitivity during pattern matching.True
stateful_ingestionStatefulStaleMetadataRemovalConfigBase specialized config for Stateful Ingestion with stale metadata removal capability.None
stateful_ingestion.enabledbooleanThe type of the ingestion state provider registered with datahub.None
stateful_ingestion.ignore_new_statebooleanIf set to True, ignores the current checkpoint state.None
stateful_ingestion.ignore_old_statebooleanIf set to True, ignores the previous checkpoint state.None
stateful_ingestion.remove_stale_metadatabooleanSoft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled.True
note

Stateful Ingestion is available only when a Platform Instance is assigned to this source.

Connecting to Confluent Cloud

If using Confluent Cloud you can use a recipe like this. In this consumer_config.sasl.username and consumer_config.sasl.password are the API credentials that you get (in the Confluent UI) from your cluster -> Data Integration -> API Keys. schema_registry_config.basic.auth.user.info has API credentials for Confluent schema registry which you get (in Confluent UI) from Schema Registry -> API credentials.

When creating API Key for the cluster ensure that the ACLs associated with the key are set like below. This is required for DataHub to read topic metadata from topics in Confluent Cloud.

Topic Name = *
Permission = ALLOW
Operation = DESCRIBE
Pattern Type = LITERAL
source:
type: "kafka"
config:
platform_instance: "YOUR_CLUSTER_ID"
connection:
bootstrap: "abc-defg.eu-west-1.aws.confluent.cloud:9092"
consumer_config:
security.protocol: "SASL_SSL"
sasl.mechanism: "PLAIN"
sasl.username: "${CLUSTER_API_KEY_ID}"
sasl.password: "${CLUSTER_API_KEY_SECRET}"
schema_registry_url: "https://abc-defgh.us-east-2.aws.confluent.cloud"
schema_registry_config:
basic.auth.user.info: "${REGISTRY_API_KEY_ID}:${REGISTRY_API_KEY_SECRET}"

sink:
# sink configs

If you are trying to add domains to your topics you can use a configuration like below.

source:
type: "kafka"
config:
# ...connection block
domain:
"urn:li:domain:13ae4d85-d955-49fc-8474-9004c663a810":
allow:
- ".*"
"urn:li:domain:d6ec9868-6736-4b1f-8aa6-fee4c5948f17":
deny:
- ".*"

Note that the domain in config above can be either an urn or a domain id (i.e. urn:li:domain:13ae4d85-d955-49fc-8474-9004c663a810 or simply 13ae4d85-d955-49fc-8474-9004c663a810). The Domain should exist in your DataHub instance before ingesting data into the Domain. To create a Domain on DataHub, check out the Domains User Guide.

If you are using a non-default subject naming strategy in the schema registry, such as RecordNameStrategy, the mapping for the topic's key and value schemas to the schema registry subject names should be provided via topic_subject_map as shown in the configuration below.

source:
type: "kafka"
config:
# ...connection block
# Defines the mapping for the key & value schemas associated with a topic & the subject name registered with the
# kafka schema registry.
topic_subject_map:
# Defines both key & value schema for topic 'my_topic_1'
"my_topic_1-key": "io.acryl.Schema1"
"my_topic_1-value": "io.acryl.Schema2"
# Defines only the value schema for topic 'my_topic_2' (the topic doesn't have a key schema).
"my_topic_2-value": "io.acryl.Schema3"

Custom Schema Registry

The Kafka Source uses the schema registry to figure out the schema associated with both key and value for the topic. By default it uses the Confluent's Kafka Schema registry and supports the AVRO and PROTOBUF schema types.

If you're using a custom schema registry, or you are using schema type other than AVRO or PROTOBUF, then you can provide your own custom implementation of the KafkaSchemaRegistryBase class, and implement the get_schema_metadata(topic, platform_urn) method that given a topic name would return object of SchemaMetadata containing schema for that topic. Please refer datahub.ingestion.source.confluent_schema_registry::ConfluentSchemaRegistry for sample implementation of this class.

class KafkaSchemaRegistryBase(ABC):
@abstractmethod
def get_schema_metadata(
self, topic: str, platform_urn: str
) -> Optional[SchemaMetadata]:
pass

The custom schema registry class can be configured using the schema_registry_class config param of the kafka source as shown below.

source:
type: "kafka"
config:
# Set the custom schema registry implementation class
schema_registry_class: "datahub.ingestion.source.confluent_schema_registry.ConfluentSchemaRegistry"
# Coordinates
connection:
bootstrap: "broker:9092"
schema_registry_url: http://localhost:8081

# sink configs

Limitations of PROTOBUF schema types implementation

The current implementation of the support for PROTOBUF schema type has the following limitations:

  • Recursive types are not supported.
  • If the schemas of different topics define a type in the same package, the source would raise an exception.

In addition to this, maps are represented as arrays of messages. The following message,

message MessageWithMap {
map<int, string> map_1 = 1;
}

becomes:

message Map1Entry {
int key = 1;
string value = 2/
}
message MessageWithMap {
repeated Map1Entry map_1 = 1;
}

Code Coordinates

  • Class Name: datahub.ingestion.source.kafka.KafkaSource
  • Browse on GitHub

Questions

If you've got any questions on configuring ingestion for Kafka, feel free to ping us on our Slack