How to use

Before we start - Limitations

Before we start, some reminders on the limitations:

Topic names must be unique

Topic names in the cluster must be unique. So if you adopt to have one repository for different teams/users, ensure to have a strong naming convention that will avoid collisions.

Some properties are not updatable

Some topic properties can not be updated, and others can, on condition. The most common one is cleanup.policy which defaults to delete . You can not change the value of this property to anything else. Instead, you must delete the topic first and re-create it.

Tip

To delete a topic via this tool, and re-create it, just comment it out, render, deploy, and un-comment it again with new settings.

Input and Configuration files definitions

The tool will accept a configuration file that will override various settings and inject it into the final CloudFormation template.

Parameters

Generic properties used to connect to Kafka cluster.

properties

  • BootstrapServers

#/definitions/BootstrapServers

  • SecurityProtocol

#/definitions/SecurityProtocol

  • SASLMechanism

#/definitions/SASLMechanism

  • SASLUsername

#/definitions/SASLUsername

  • SASLPassword

#/definitions/SASLPassword

  • RegistryUrl

#/definitions/RegistryUrl

  • RegistryUsername

#/definitions/RegistryUsername

  • RegistryPassword

#/definitions/RegistryPassword

  • CompatibilityMode

ews-kafka-schema.json#/definitions/CompatibilityMode

definitions

  • BootstrapServers

Endpoint URL of the Kafka cluster in the format hostname:port

type

string

  • SecurityProtocol

Kafka Security Protocol.

type

string

enum

PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL

default

PLAINTEXT

  • SASLMechanism

Kafka SASL mechanism for Authentication

type

string

enum

PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512

default

PLAIN

  • SASLUsername

Kafka SASL username for Authentication

type

string

default

  • SASLPassword

Kafka SASL password for Authentication

type

string

default

  • RegistryUrl

Schema registry URL

type

string

  • RegistryUsername

Schema registry username

type

string

  • RegistryPassword

Schema registry password

type

string

  • RegistryUserInfo

The username and password together in the form of username:password

type

string

Input and configuration

type

object

properties

  • Globals

ews-kafka-parameters.json

  • Topics

type

object

properties

  • Topics

type

array

items

ews-kafka-topic.json

  • ReplicationFactor

ews-kafka-topic.json#/definitions/ReplicationFactor

  • FunctionName

Name or ARN of the Lambda function to use for Custom::KafkaTopic

type

string

  • DeletionPolicy

type

string

enum

Retain, Delete

default

Retain

  • ImportExisting

Whether to import existing topics on Create. Fails if set to false

type

boolean

default

True

  • ACLs

type

object

properties

  • Policies

ews-kafka-acl.json#/properties/Policies

  • FunctionName

Name or ARN of the Lambda function to use for Custom::KafkaACL

type

string

  • Schemas

type

object

properties

  • FunctionName

Name or ARN of the Schema Registry function to use

type

string

  • RegistryUrl

type

string

  • RegistryUsername

ews-kafka-parameters.json#/definitions/RegistryUsername

  • RegistryPassword

ews-kafka-parameters.json#/definitions/RegistryPassword

  • RegistryUserInfo

ews-kafka-parameters.json#/definitions/RegistryUserInfo

  • CompatibilityMode

ews-kafka-schema.json#/definitions/CompatibilityMode

  • DeletionPolicy

When set, overrides the DeletionPolicy set as default for all schemas

type

string

enum

Retain, Delete

default

Retain

  • S3Store

type

object

properties

  • BucketName

type

string

format

hostname

  • PrefixPath

type

string

pattern

^[^/](.*)/$

default

additionalProperties

False

definitions

  • SchemasDef

ews-kafka-schema.json

  • AclsModel

ews-kafka-acl.json

ACLs

Resource to create Kafka topics in your cluster.

properties

  • Policies

type

array

items

#/definitions/PolicyDef

uniqueItems

True

definitions

  • PolicyDef

type

object

properties

  • Resource

Name of the resource to apply the ACL for

type

string

  • PatternType

Pattern type for resource value

type

string

enum

LITERAL, PREFIXED, MATCH

pattern

^[A-Z]+$

default

LITERAL

  • Principal

Kafka user to apply the ACLs for.

type

string

  • ResourceType

Kafka user to apply the ACLs for.

type

string

enum

CLUSTER, DELEGATION_TOKEN, GROUP, TOPIC, TRANSACTIONAL_ID

  • Action

Access action allowed.

type

string

enum

ALL, READ, WRITE, CREATE, DELETE, ALTER, DESCRIBE, CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, IDEMPOTENT_WRITE

  • Effect

Effect for the ACL.

type

string

enum

DENY, ALLOW

  • Host

Specify the host for the ACL. Defaults to ‘*’

type

string

default

*

additionalProperties

False

Topics

Resource to create Kafka topics in your cluster.

properties

  • Name

#/definitions/Name

  • PartitionsCount

#/definitions/PartitionsCount

  • ReplicationFactor

#/definitions/ReplicationFactor

  • BootstrapServers

ews-kafka-parameters.json#/definitions/BootstrapServers

  • SecurityProtocol

ews-kafka-parameters.json#/definitions/SecurityProtocol

  • SASLMechanism

ews-kafka-parameters.json#/definitions/SASLMechanism

  • SASLUsername

ews-kafka-parameters.json#/definitions/SASLUsername

  • SASLPassword

ews-kafka-parameters.json#/definitions/SASLPassword

  • Schema

#/definitions/TopicSchemas

  • Settings

#/definitions/TopicsSettings

definitions

  • Name

Kafka topic name

type

string

minLength

1

pattern

^[a-zA-Z0-9_.-]+$

  • PartitionsCount

Number of partitions for the new Kafka topic

type

integer

default

1

  • ReplicationFactor

Kafka topic replication factor

type

integer

default

3

  • DeletionPolicy

When set, overrides the DeletionPolicy set on the Topic of the schema. For safety, defaulting to Retain

type

string

enum

Retain, Delete

default

Retain

  • TopicSchemas

type

object

properties

  • Key

ews-kafka-schema.json#/definitions/TopicSchemaDef

  • Value

ews-kafka-schema.json#/definitions/TopicSchemaDef

  • Header

ews-kafka-schema.json#/definitions/TopicSchemaDef

  • DeletionPolicy

When set, overrides the DeletionPolicy set on the Topic of the schema. For safety, defaulting to Retain

type

string

enum

Retain, Delete

default

Retain

  • TopicsSettings

type

object

properties

  • cleanup.policy

https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_cleanup.policy

type

string

enum

compact, delete, compact,delete

default

delete

  • compression.type

https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_compression.type

type

string

enum

uncompressed, zstd, lz4, snappy, gzip, producer

default

producer

  • delete.retention.ms

https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_delete.retention.ms

type

integer

minimum

0

default

86400000

  • file.delete.delay.ms

https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_file.delete.delay.ms

type

integer

minimum

0

default

60000

  • retention.ms

https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_retention.ms

type

integer

minimum

-1

default

604800000

  • flush.messages

https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_flush.messages

type

integer

minimum

0

default

9223372036854775807

  • flush.ms

https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_flush.ms

type

integer

minimum

0

default

9223372036854775807

  • index.interval.bytes

https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_index.interval.bytes

type

integer

minimum

0

default

4096

  • max.compaction.lag.ms

https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_max.compaction.lag.ms

type

integer

minimum

0

default

9223372036854775807

  • min.compaction.lag.ms

https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_min.compaction.lag.ms

type

integer

minimum

0

  • min.cleanable.dirty.ratio

https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_min.cleanable.dirty.ratio

type

number

maximum

1.0

minimum

0.0

default

0.5

  • max.message.bytes

https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_max.message.bytes

type

integer

minimum

0

default

1048588

  • confluent.key.schema.validation

https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_confluent.key.schema.validation

type

boolean

  • confluent.key.subject.name.strategy

https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_confluent.key.subject.name.strategy

type

string

  • confluent.tier.enable

https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_confluent.tier.enable

type

string

Schema

Note that for schemas, this only works for topic name strategy. Resulting schemas will be ${topic.name}-[key|value]

Resource to create Kafka topics in your cluster.

properties

  • RegistryUrl

ews-kafka-parameters.json#/definitions/RegistryUrl

  • RegistryUsername

ews-kafka-parameters.json#/definitions/RegistryUsername

  • RegistryPassword

ews-kafka-parameters.json#/definitions/RegistryPassword

  • RegistryUserInfo

ews-kafka-parameters.json#/definitions/RegistryUserInfo

  • Subject

type

string

  • Serializer

#/definitions/SerializerDef

  • Definition

type

string / object

  • CompatibilityMode

#/definitions/CompatibilityMode

  • ServiceToken

The Lambda Function ARN

type

string

  • PermanentlyDelete

If set to true, the Schema is set to hard delete. Use carefully

type

boolean

default

False

definitions

  • CompatibilityMode

type

string

enum

BACKWARD, BACKWARD_TRANSITIVE, FORWARD, FORWARD_TRANSITIVE, FULL, FULL_TRANSITIVE, NONE

default

NONE

  • SerializerDef

type

string

enum

AVRO, JSON, PROTOBUF

  • TopicSchemaDef

type

object

properties

  • Serializer

#/definitions/SerializerDef

  • Definition

oneOf

type

string

type

object

  • CompatibilityMode

#/definitions/CompatibilityMode