usage: producer-performance [-h]--topic TOPIC --num-records NUM-RECORDS
[--payload-delimiter PAYLOAD-DELIMITER]--throughput THROUGHPUT
[--producer-props PROP-NAME=PROP-VALUE [PROP-NAME=PROP-VALUE ...]]//可以配置对应的参数[--producer.config CONFIG-FILE]//可以配置对应的/config/producer.properties,并且使之生效[--print-metrics][--transactional-id TRANSACTIONAL-ID][--transaction-duration-ms TRANSACTION-DURATION](--record-size RECORD-SIZE |--payload-file PAYLOAD-FILE)
This tool is used to verify the producer performance.
optional arguments:-h,--help show this help message and exit
--topic TOPIC produce messages to this topic
--num-records NUM-RECORDS
number of messages to produce
--payload-delimiter PAYLOAD-DELIMITER
provides delimiter to be used when --payload-file
is provided. Defaults to newline. Note that this
parameter will be ignored if--payload-file is
not provided.(default: \n)--throughput THROUGHPUT
throttle maximum message throughput to
*approximately* THROUGHPUT messages/sec
--producer-props PROP-NAME=PROP-VALUE [PROP-NAME=PROP-VALUE ...]
kafka producer related configuration properties
like bootstrap.servers,client.id etc. These
configs take precedence over those passed via --
producer.config.--producer.config CONFIG-FILE
producer config properties file.--print-metrics print out metrics at the end of the test.(default:false)--transactional-id TRANSACTIONAL-ID
The transactionalId to use if transaction-
duration-ms is >0. Useful when testing the
performance of concurrent transactions.(default:
performance-producer-default-transactional-id)--transaction-duration-ms TRANSACTION-DURATION
The max age of each transaction. The
commitTransaction will be called after this time
has elapsed. Transactions are only enabled ifthis value is positive.(default:0)
either --record-size or --payload-file must be specified but not both.--record-size RECORD-SIZE
message size in bytes. Note that you must provide
exactly one of --record-size or --payload-file.--payload-file PAYLOAD-FILE
file to read the message payloads from. This
works only for UTF-8 encoded text files. Payloads
will be read from this file and a payload will be
randomly selected when sending messages. Note
that you must provide exactly one of --record-
size or --payload-file.
Option Description
-------------------broker-list <String: host> REQUIRED: The server(s) to connect to.--consumer.config <String: config file> Consumer config properties file.--date-format <String: date format> The date format to use for formatting
the time field. See java.text.
SimpleDateFormat for options.(default: yyyy-MM-dd HH:mm:ss:SSS)--fetch-size <Integer: size> The amount of data to fetch in a
single request.(default:1048576)--from-latest If the consumer does not already have
an established offset to consume
from, start with the latest message
present in the log rather than the
earliest message.--group <String: gid> The group id to consume on.(default:
perf-consumer-61669)--help Print usage.--hide-header If set, skips printing the header for
the stats
--messages <Long: count> REQUIRED: The number of messages to
send or consume
--num-fetch-threads <Integer: count> Number of fetcher threads.(default:1)--print-metrics Print out the metrics.--reporting-interval <Integer: Interval in milliseconds at which to
interval_ms> print progress info.(default:5000)--show-detailed-stats If set, stats are reported for each
reporting interval as configured by
reporting-interval
--socket-buffer-size <Integer: size> The size of the tcp RECV size.(default:2097152)--threads <Integer: count> Number of processing threads.(default:10)--timeout [Long: milliseconds] The maximum allowed time in
milliseconds between returned
records.(default:10000)--topic <String: topic> REQUIRED: The topic to consume from.
二、kafka生产者性能测试工具使用过程中的错误
错误一
描述:使用命令
/kafka-producer-perf-test.sh --topic test --num-records 100000--record-size 67108864--throughput 20000--producer-props bootstrap.servers=219.216.65.104:9092//record size对应是64M
错误:具体内容是
org.apache.kafka.common.errors.RecordTooLargeException: The message is 67108864 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
org.apache.kafka.common.errors.RecordTooLargeException: The message is 67108864 bytes when serialized which is larger than the total memory buffer you have configured with the buffer.memory configuration.
org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.kafka.clients.producer.internals.BufferPool.allocateByteBuffer(BufferPool.java:219)
at org.apache.kafka.clients.producer.internals.BufferPool.safeAllocateByteBuffer(BufferPool.java:200)
at org.apache.kafka.clients.producer.internals.BufferPool.allocate(BufferPool.java:183)
at org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:210)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:904)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:843)
at org.apache.kafka.tools.ProducerPerformance.main(ProducerPerformance.java:143)
vim /bin/kafka-producer-perf-test.sh
#########
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M" //将-Xmx512M 改成-Xmx4G即可
fi
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance "$@"
当然如果需要修改整个集群的jvm heap space 需要修改kafka-run-class.sh和kafka-server-start.sh 里面对应的项即可