设为首页 加入收藏

TOP

kafka - 自定义序列化器
2019-04-23 14:27:32 】 浏览:36
Tags:kafka 定义 序列化
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u011669700/article/details/80005023

kafka - 自定义序列化器

在 kafka 中提供了以下的序列化器:

  1. ByteArraySerializer
  2. StringSerializer
  3. IntegerSerializer

但是内置提供的序列化器并不能满足大部分场景的需求,因此我们需要自定义序列化器


一、自定义序列化器

1.1 客户

我们首先创建一个简单的类用于表示客户:

public class Customer {
    private int customerID;
    private String customerName;
    public Customer(int ID, String name) {
        this.customerID = ID;
        this.customerName = name;
    }
    public int getID() {
        return customerID;
    }
    public String getName() {
        return customerName;
    }
}

1.2 定义序列化器

接下来为 Customer 类创建序列号器:

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

import java.nio.ByteBuffer;
import java.util.Map;

/**
 * Created by Joe on 2018/4/19
 */
public class CustomerSerializer implements Serializer<Customer> {

    @Override
    public void configure(Map configs, boolean isKey) {
        // 不做任何配置
    }

    @Override
    /*
     * Customer对象被序列化成:
     * 表示customerID的4字节整数
     * 表示customerName长度的4字节整数(如果customerName为空,则长度为0)
     * 表示customerName的N个字节
     */
    public byte[] serialize(String topic, Customer data) {
        try {
            byte[] serializedName;
            int stringSize;
            if (data == null)
                return null;
            else {
                if (data.getName() != null) {
                    serializedName = data.getName().getBytes("UTF-8");
                    stringSize = serializedName.length;
                } else {
                    serializedName = new byte[0];
                    stringSize = 0;
                }
            }
            ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
            buffer.putInt(data.getID());
            buffer.putInt(stringSize);
            buffer.put(serializedName);

            return buffer.array();
        } catch (Exception e) {
            throw new SerializationException("Error when serializing Customer to  byte[] " + e);
        }
    }

    @Override
    public void close() {
        // nothing to close
    }
}

在定义好CustomerSerializer类之后,我们就可以定义ProducerRecord<String, Customer>类型的消息传递给 kafka 。

但是这样使用自定义序列化器依旧有缺陷:

  1. 如果我们有多种类型的消费者,可能需要把 customerID 字段变成长整型,或者为 Customer 添加 startDate 字段,这样就会出现新旧消息的兼容性问题。
  2. 多个团队同时向 Kafka 写入 Customer 数据,那么他们就需要使用相同的序列化器。如果序列化器发生改动,他们也要在同一时间修改代码。

因此,在实际使用中,不建议使用自定义序列化器,而是使用已有的序列化器和反序列化器,比如 JSON、Avro、Thrift 或 Protobuf。

二、Avro序列化

Apache Avro是一种与编程语言无关的序列化格式。Avro 数据通过与语言无关的 schema 来定义。schema 通过 JSON 来描述,数据被序列化成二进制文件或 JSON 文件,不过一般会使用二进制文件。

Avro 有一个很有意思的特性是,当负责写消息的应用程序使用了新的 schema,负责读消息的应用程序可以继续处理消息而无需做任何改动,这个特性使得它特别适合用在像 Kafka 这样的消息系统上。

假设有如下的 schema:

{
    "namespace": "customerManagement.avro",
    "type": "record",
    "name": "Customer",
    "fields": [{
            "name": "id",
            "type": "int"
        },
        {
            "name": "name",
            "type": "string"
        },
        {
            "name": "faxNumber",
            "type": ["null", "string"],
            "default": "null"
        }
    ]
}

这个 schema 表明了 id 和 name 属性是必需的。而 faxNumber 字段可选为空,且默认为 null。

假设这个 schema 已经正常运行一段时间,并且生成了很多数据。然后在后面的发展,决定使用 email 字段来进行代替,这时候我们就可以生成如下的新 schema。

{
    "namespace": "customerManagement.avro",
    "type": "record",
    "name": "Customer",
    "fields": [{
            "name": "id",
            "type": "int"
        },
        {
            "name": "name",
            "type": "string"
        },
        {
            "name": "email",
            "type": ["null", "string"],
            "default": "null"
        }
    ]
}

在应用程序升级之前,它们会调用类似 getName()、getId() 和 getFaxNumber() 这样的方法。如果碰到使用新 schema 构建的消息,getName() 和 getId() 方法仍然能够正常返回,但 getFaxNumber() 方法会返回 null,因为消息里不包含传真号码。

在应用程序升级之后,getEmail() 方法取代了 getFaxNumber() 方法。如果碰到一个使用旧 schema 构建的消息,那么 getEmail() 方法会返回 null,因为旧消息不包含邮件地址。

由此可以看出使用 Avro 的好处:当修改了消息的 schema,没有更新所有负责读取数据的应用程序。仍然不会出现异常或阻断性错误,也不需要对现有数据进行大幅更新。

但是依旧需要注意以下两点:

  1. 用于写入数据和读取数据的 schema 必须是相互兼容的。Avro 文档提到了一些兼容性原则。
  2. 反序列化器需要用到用于写入数据的 schema,即使它可能与用于读取数据的 schema 不一样。Avro 数据文件里就包含了用于写入数据的 schema。
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Kafka集群错误汇总 下一篇CentOS7安装kafka(scala)和zook..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目