设为首页 加入收藏

TOP

RdKafka文档翻译(一)
2019-09-19 11:10:25 】 浏览:256
Tags:RdKafka 文档 翻译

函数
string rd_kafka_err2str ( integer $err ) 将rdkafka错误代码转换为字符串

integer rd_kafka_errno2err ( integer $errnox ) 将系统errno转换为Kafka错误代码

integer rd_kafka_errno ( void ) 返回系统errno

integer rd_kafka_offset_tail ( integer $cnt ) 返回一个特殊的偏移量值,该值可用于在主题尾部之前开始使用cnt消息

RdKafka\KafkaConsume类
这是高水平消费者,支持自动分区/撤销(pecl rdkafka>=1.0.0,librdkafka>=0.9)

1)public void RdKafka\KafkaConsumer::assign ([ array $topic_partitions = NULL ] )
更新分配集到$topic_partitions,可以通过调用RdKafka\Conf::setDefaultTopicConf()来更改主题的默认配置
$kafkaConsumer->assign([
new RdKafka\TopicPartition("logs", 0),
new RdKafka\TopicPartition("logs", 1),
]);

2)public void RdKafka\KafkaConsumer::commit ([ mixed $message_or_offsets = NULL ] )
同步提交偏移,直到提交偏移或提交失败为止。
如果注册了COMMIT_CB回调,那么它将被调用,并包含未来要使用的调用的提交详细信息。
参数
message_or_offsets
When NULL, commit offsets for the current assignment.
When a RdKafka\Message, commit offset for a single topic+partition based on the message.
When an array of RdKafka\TopicPartition, commit offsets for the provided list of partitions.
异常
Errors/Exceptions
Throws RdKafka\Exception on errors.
例子:
// Commit offsets for the current assignment
$kafkaConsumer->commit();

// Commit offsets based on the message's topic, partition, and offset
$kafkaConsumer->commit($message);

// Commit offsets by providing a list of TopicPartition
$kafkaConsumer->commit([
new RdKafka\TopicPartition($topic, $partition, $offset),
]);

3)public void RdKafka\KafkaConsumer::commitAsync ([ string $message_or_offsets = NULL ] )
异步提交偏移
4)public RdKafka\KafkaConsumer::__construct ( RdKafka\Conf $conf )
参数
conf (RdKafka\Conf)
The conf object must have group.id set to the consumer group to join.
conf对象必须将Group.id设置为要加入的消费者组。
示例:
$conf = new RdKafka\Conf();
$conf->set("group.id", "myGroupID");

$kafkaConsumer = new RdKafka\KafkaConsumer($conf);
5)public RdKafka\Message RdKafka\KafkaConsumer::consume ( string $timeout_ms )
使用消息或获取错误事件,触发回调
将自动调用任何此类排队事件的已注册回调,包括rebalance_cb, event_cb, commit_cb, etc.
参数
timeout_ms (int) 超时时间(milliseconds)
返回值
Returns a RdKafka\Message. On error or timeout, RdKafka\Message::$err is != RD_KAFKA_ERR_NO_ERROR, and other properties should be ignored.
注意:
应用程序应确保定期调用consume (),即使没有预期的消息,为等待调用的排队回调提供服务,当rebalnce_cb已经注册时,这一点尤其重要,因为需要正确地调用和处理它,以同步内部使用者状态。


while (true) {
$message = $kafkaConsumer->consume(3600e3);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
handle($message);
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timedout\n";
break;
default:
throw new \Exception($message->errstr());
break;
}
}

6)public array RdKafka\KafkaConsumer::getAssignment ( void )
返回由assign设置 或 再平衡的 当前分区分配集
Returns the current partition assignment as set by RdKafka\KafkaConsumer::assign() or by rebalancing.
返回值
Returns an array of RdKafka\TopicPartition 返回RdKafka\Topic分区的数组
Errors/Exceptions
Throws RdKafka\Exception on errors.

6)public RdKafka\Metadata RdKafka\KafkaConsumer::getMetadata ( bool $all_topics , RdKafka\KafkaConsumerTopic $only_topic = NULL , int $timeout_ms)
向代理请求元数据

参数
all_topics (bool)
When TRUE, request info about all topics in cluster. Else, only request info about locally known topics.如果为真,请求有关集群中所有主题的信息。否则,只请求有关本地已知主题的信息
only_topic (RdKafka\KafkaConsumerTopic)
When non-null, only request info about this topic当非空时,只请求有关此主

首页 上一页 1 2 3 4 5 6 7 下一页 尾页 1/9/9
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇laravel模板布局 下一篇用 PHP 函数变量数组改变代码结构

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目