Skip to main content
Skip to main content

AvroConfluent

InputOutputAlias

Description

Apache Avro is a row-oriented serialization format that uses binary encoding for efficient data processing. The AvroConfluent format supports reading and writing Avro-encoded messages using the Confluent Schema Registry (or API-compatible services).

Each message uses the Confluent wire format: a magic byte (0x00) followed by a 4-byte big-endian schema ID, followed by the Avro binary datum. When reading, ClickHouse resolves the schema ID by querying the registry. When writing, ClickHouse registers the schema derived from the output columns and prepends the resulting ID to each row. Schemas are cached for optimal performance.

Data type mapping

The table below shows all data types supported by the Apache Avro format, and their corresponding ClickHouse data types in INSERT and SELECT queries.

Avro data type INSERTClickHouse data typeAvro data type SELECT
boolean, int, long, float, doubleInt(8\16\32), UInt(8\16\32)int
boolean, int, long, float, doubleInt64, UInt64long
boolean, int, long, float, doubleFloat32float
boolean, int, long, float, doubleFloat64double
bytes, string, fixed, enumStringbytes or string *
bytes, string, fixedFixedString(N)fixed(N)
enumEnum(8\16)enum
array(T)Array(T)array(T)
map(V, K)Map(V, K)map(string, K)
union(null, T), union(T, null)Nullable(T)union(null, T)
union(T1, T2, …) **Variant(T1, T2, …)union(T1, T2, …) **
nullNullable(Nothing)null
int (date) ***Date, Date32int (date) ***
long (timestamp-millis) ***DateTime64(3)long (timestamp-millis) ***
long (timestamp-micros) ***DateTime64(6)long (timestamp-micros) ***
bytes (decimal) ***DateTime64(N)bytes (decimal) ***
intIPv4int
fixed(16)IPv6fixed(16)
bytes (decimal) ***Decimal(P, S)bytes (decimal) ***
string (uuid) ***UUIDstring (uuid) ***
fixed(16)Int128/UInt128fixed(16)
fixed(32)Int256/UInt256fixed(32)
recordTuplerecord

* bytes is default, controlled by setting output_format_avro_string_column_pattern

** The Variant type implicitly accepts null as a field value, so for example the Avro union(T1, T2, null) will be converted to Variant(T1, T2). As a result, when producing Avro from ClickHouse, we have to always include the null type to the Avro union type set as we don't know if any value is actually null during the schema inference.

*** Avro logical types

Unsupported Avro logical data types:

  • time-millis
  • time-micros
  • duration

Format settings

SettingDescriptionDefault
input_format_avro_allow_missing_fieldsWhether to use a default value instead of throwing an error when a field is not found in the schema.0
input_format_avro_null_as_defaultWhether to use a default value instead of throwing an error when inserting a null value into a non-nullable column.0
format_avro_schema_registry_urlThe Confluent Schema Registry URL. For basic authentication, URL-encoded credentials can be included directly in the URL path.
format_avro_schema_registry_connection_timeoutConnection timeout in seconds for the Schema Registry HTTP client (used for both schema fetch and registration). Must be greater than 0 and less than 600 (10 minutes).1
format_avro_schema_registry_send_timeoutSend timeout in seconds for the Schema Registry HTTP client. Must be greater than 0 and less than 600 (10 minutes).1
format_avro_schema_registry_receive_timeoutReceive timeout in seconds for the Schema Registry HTTP client. Must be greater than 0 and less than 600 (10 minutes).1
output_format_avro_confluent_subjectFor output: the subject name under which the schema is registered in the Schema Registry. Required when writing.
output_format_avro_string_column_patternFor output: regexp of String columns to serialize as Avro string (default is bytes).

Examples

Reading from Kafka

To read an Avro-encoded Kafka topic using the Kafka table engine, use the format_avro_schema_registry_url setting to provide the URL of the schema registry.

CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url';

SELECT * FROM topic1_stream;

Writing to Kafka

To write AvroConfluent messages to a Kafka topic, set both the schema registry URL and the subject name. The schema is automatically registered with the registry on the first write.

CREATE TABLE topic1_sink
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url',
output_format_avro_confluent_subject = 'topic1-value';

INSERT INTO topic1_sink VALUES ('hello', 'world');

Using basic authentication

If your schema registry requires basic authentication (e.g., if you're using Confluent Cloud), you can provide URL-encoded credentials in the format_avro_schema_registry_url setting.

CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'https://<username>:<password>@schema-registry-url';

Troubleshooting

To monitor ingestion progress and debug errors with the Kafka consumer, you can query the system.kafka_consumers system table. If your deployment has multiple replicas (e.g., ClickHouse Cloud), you must use the clusterAllReplicas table function.

SELECT * FROM clusterAllReplicas('default',system.kafka_consumers)
ORDER BY assignments.partition_id ASC;

If you run into schema resolution issues, you can use kafkacat with clickhouse-local to troubleshoot:

$ kafkacat -b kafka-broker  -C -t topic1 -o beginning -f '%s' -c 3 | clickhouse-local   --input-format AvroConfluent --format_avro_schema_registry_url 'http://schema-registry' -S "field1 Int64, field2 String"  -q 'select *  from table'
1 a
2 b
3 c