kafka - 自定义序列化器
在 kafka 中提供了以下的序列化器:
- ByteArraySerializer
- StringSerializer
- IntegerSerializer
但是内置提供的序列化器并不能满足大部分场景的需求,因此我们需要自定义序列化器
一、自定义序列化器
1.1 客户
我们首先创建一个简单的类用于表示客户:
public class Customer {
private int customerID;
private String customerName;
public Customer(int ID, String name) {
this.customerID = ID;
this.customerName = name;
}
public int getID() {
return customerID;
}
public String getName() {
return customerName;
}
}
1.2 定义序列化器
接下来为 Customer 类创建序列号器:
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import java.nio.ByteBuffer;
import java.util.Map;
public class CustomerSerializer implements Serializer<Customer> {
@Override
public void configure(Map configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, Customer data) {
try {
byte[] serializedName;
int stringSize;
if (data == null)
return null;
else {
if (data.getName() != null) {
serializedName = data.getName().getBytes("UTF-8");
stringSize = serializedName.length;
} else {
serializedName = new byte[0];
stringSize = 0;
}
}
ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
buffer.putInt(data.getID());
buffer.putInt(stringSize);
buffer.put(serializedName);
return buffer.array();
} catch (Exception e) {
throw new SerializationException("Error when serializing Customer to byte[] " + e);
}
}
@Override
public void close() {
}
}
在定义好CustomerSerializer
类之后,我们就可以定义ProducerRecord<String, Customer>
类型的消息传递给 kafka 。
但是这样使用自定义序列化器依旧有缺陷:
- 如果我们有多种类型的消费者,可能需要把 customerID 字段变成长整型,或者为 Customer 添加 startDate 字段,这样就会出现新旧消息的兼容性问题。
- 多个团队同时向 Kafka 写入 Customer 数据,那么他们就需要使用相同的序列化器。如果序列化器发生改动,他们也要在同一时间修改代码。
因此,在实际使用中,不建议使用自定义序列化器,而是使用已有的序列化器和反序列化器,比如 JSON、Avro、Thrift 或 Protobuf。
二、Avro序列化
Apache Avro是一种与编程语言无关的序列化格式。Avro 数据通过与语言无关的 schema 来定义。schema 通过 JSON 来描述,数据被序列化成二进制文件或 JSON 文件,不过一般会使用二进制文件。
Avro 有一个很有意思的特性是,当负责写消息的应用程序使用了新的 schema,负责读消息的应用程序可以继续处理消息而无需做任何改动,这个特性使得它特别适合用在像 Kafka 这样的消息系统上。
假设有如下的 schema:
{
"namespace": "customerManagement.avro",
"type": "record",
"name": "Customer",
"fields": [{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "faxNumber",
"type": ["null", "string"],
"default": "null"
}
]
}
这个 schema 表明了 id 和 name 属性是必需的。而 faxNumber 字段可选为空,且默认为 null。
假设这个 schema 已经正常运行一段时间,并且生成了很多数据。然后在后面的发展,决定使用 email 字段来进行代替,这时候我们就可以生成如下的新 schema。
{
"namespace": "customerManagement.avro",
"type": "record",
"name": "Customer",
"fields": [{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "email",
"type": ["null", "string"],
"default": "null"
}
]
}
在应用程序升级之前,它们会调用类似 getName()、getId() 和 getFaxNumber() 这样的方法。如果碰到使用新 schema 构建的消息,getName() 和 getId() 方法仍然能够正常返回,但 getFaxNumber() 方法会返回 null,因为消息里不包含传真号码。
在应用程序升级之后,getEmail() 方法取代了 getFaxNumber() 方法。如果碰到一个使用旧 schema 构建的消息,那么 getEmail() 方法会返回 null,因为旧消息不包含邮件地址。
由此可以看出使用 Avro 的好处:当修改了消息的 schema,没有更新所有负责读取数据的应用程序。仍然不会出现异常或阻断性错误,也不需要对现有数据进行大幅更新。
但是依旧需要注意以下两点:
- 用于写入数据和读取数据的 schema 必须是相互兼容的。Avro 文档提到了一些兼容性原则。
- 反序列化器需要用到用于写入数据的 schema,即使它可能与用于读取数据的 schema 不一样。Avro 数据文件里就包含了用于写入数据的 schema。