{"rsdb":{"rid":"309154","subhead":"","postdate":"0","aid":"224316","fid":"120","uid":"1","topic":"1","content":"
\n \n \n
\n \n
\n

kafka\u7248\u672c0.8.2.1<\/p>\n

Java\u5ba2\u6237\u7aef\u7248\u672c0.9.0.0<\/p>\n

<\/p>\n

\u4e3a\u4e86\u66f4\u597d\u7684\u5b9e\u73b0\u8d1f\u8f7d\u5747\u8861\u548c\u6d88\u606f\u7684\u987a\u5e8f\u6027\uff0cKafka Producer\u53ef\u4ee5\u901a\u8fc7\u5206\u53d1\u7b56\u7565\u53d1\u9001\u7ed9\u6307\u5b9a\u7684Partition\u3002Kafka\u4fdd\u8bc1\u5728partition\u4e2d\u7684\u6d88\u606f\u662f\u6709\u5e8f\u7684\u3002Kafka Java\u5ba2\u6237\u7aef\u6709\u9ed8\u8ba4\u7684Partitioner\u3002\u5b9e\u73b0\u5982\u4e0b\uff1a<\/p>\n

<\/p>\n

\n
\"\u590d\u5236\u4ee3\u7801\"<\/a><\/span><\/div>\n
public<\/span> int<\/span> partition(ProducerRecord<byte<\/span>[], byte<\/span>[]> record, Cluster cluster) {\n        List partitions <\/span>= cluster.partitionsForTopic(record.topic());\n        <\/span>int<\/span> numPartitions = partitions.size();\n        <\/span>if<\/span>(record.partition() != null<\/span>) {\n            <\/span>if<\/span>(record.partition().intValue() >= 0 && record.partition().intValue() < numPartitions) {\n                <\/span>return<\/span> record.partition().intValue();\n            } <\/span>else<\/span> {\n                <\/span>throw<\/span> new<\/span> IllegalArgumentException(\"Invalid partition given with record: \" + record.partition() + \" is not in the range [0...\" + numPartitions + \"].\");\n            }\n        } <\/span>else<\/span> if<\/span>(record.key() == null<\/span>) {\n            <\/span>int<\/span> nextValue = this<\/span>.counter.getAndIncrement();\n            List availablePartitions <\/span>= cluster.availablePartitionsForTopic(record.topic());\n            <\/span>if<\/span>(availablePartitions.size() > 0) {\n                <\/span>int<\/span> part = Utils.abs(nextValue) % availablePartitions.size();\n                <\/span>return<\/span> ((PartitionInfo)availablePartitions.get(part)).partition();\n            } <\/span>else<\/span> {\n                <\/span>return<\/span> Utils.abs(nextValue) % numPartitions;\n            }\n        } <\/span>else<\/span> {\n            <\/span>return<\/span> Utils.abs(Utils.murmur2((byte<\/span>[])record.key())) % numPartitions;\n        }\n    }<\/span><\/pre>\n
\"\u590d\u5236\u4ee3\u7801\"<\/a><\/span><\/div>\n<\/div>\n

\u4ece\u6e90\u7801\u53ef\u4ee5\u770b\u51fa\uff0c\u9996\u5148\u83b7\u53d6topic\u7684\u6240\u6709Patition\uff0c\u5982\u679c\u5ba2\u6237\u7aef\u4e0d\u6307\u5b9aPatition\uff0c\u4e5f\u6ca1\u6709\u6307\u5b9aKey\u7684\u8bdd\uff0c\u4f7f\u7528\u81ea\u589e\u957f\u7684\u6570\u5b57\u53d6\u4f59\u6570\u7684\u65b9\u5f0f\u5b9e\u73b0\u6307\u5b9a\u7684Partition\u3002\u8fd9\u6837Kafka\u5c06\u5e73\u5747\u7684\u5411Partition\u4e2d\u751f\u4ea7\u6570\u636e\u3002\u6d4b\u8bd5\u4ee3\u7801\u5982\u4e0b\uff1a<\/p>\n

Producer\uff1a<\/p>\n

\n
\"\u590d\u5236\u4ee3\u7801\"<\/a><\/span><\/div>\n
String topic = \"haoxy1\";\n\n            <\/span>int<\/span> i = 0;\n\n            Properties props <\/span>= new<\/span> Properties();\n            props.put(<\/span>\"bootstrap.servers\", \"10.23.22.237:9092,10.23.22.238:9092,10.23.22.239:9092\");\n            props.put(<\/span>\"key.serializer\", \"org.apache.kafka.common.serialization.StringSerializer\");\n            props.put(<\/span>\"value.serializer\", \"org.apache.kafka.common.serialization.StringSerializer\");\n\n            KafkaProducer<\/span><String, String> producer = new<\/span> KafkaProducer<String, String>(props);\n\n            System.out.println(<\/span>\"partitions count \" + producer.partitionsFor(topic));\n            <\/span>while<\/span>(true<\/span>) {\n                String msg <\/span>= \"test\"+i++;\n\n                ProducerRecord<\/span><String, String> producerRecord = new<\/span> ProducerRecord<String, String>(topic, msg);\n                producer.send(producerRecord);\n                System.out.println(<\/span>\"send \" + msg);\n                Thread.sleep(<\/span>5000);\n            }<\/span><\/pre>\n
\"\u590d\u5236\u4ee3\u7801\"<\/a><\/span><\/div>\n<\/div>\n

Consumer\uff1a<\/p>\n

\n
\"\u590d\u5236\u4ee3\u7801\"<\/a><\/span><\/div>\n
String topic = \"haoxy1\";\n\n            Properties props <\/span>= new<\/span> Properties();\n            props.put(<\/span>\"zookeeper.connect\", \"10.23.22.237:2181,10.23.22.238:2181,10.23.22.239:2181\");\n            props.put(<\/span>\"group.id\", \"cg.nick\");\n            props.put(<\/span>\"consumer.id\", \"c.nick\");\n\n            Map<\/span><String, Integer> topicCountMap = new<\/span> HashMap<String, Integer>();\n            topicCountMap.put(topic, <\/span>3);\n            ConsumerConfig consumerConfig <\/span>= new<\/span> ConsumerConfig(props);\n            ConsumerConnector consumer <\/span>= Consumer.createJavaConsumerConnector(consumerConfig);\n            Map<\/span><String, List<KafkaStream<byte<\/span>[], byte<\/span>[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);\n            List<\/span><KafkaStream<byte<\/span>[], byte<\/span>[]>> streams = consumerMap.get(topic);\n            ExecutorService executor <\/span>= Executors.newFixedThreadPool(3);\n            <\/span>for<\/span> (final<\/span> KafkaStream stream : streams) {\n                executor.submit(<\/span>new<\/span> Runnable() {\n                    <\/span>public<\/span> void<\/span> run() {\n                        ConsumerIterator<\/span><byte<\/span>[], byte<\/span>[]> it = stream.iterator();\n                        <\/span>while<\/span> (it.hasNext()) {\n                            MessageAndMetadata<\/span><byte<\/span>[], byte<\/span>[]> mm = it.next();\n                            System.out.println(String.format(<\/span>\"partition = %s, offset = %d, key = %s, value = %s\", mm.partition(), mm.offset(), mm.key(), new<\/span> String(mm.message())));\n                        }\n                    }\n                });\n            }<\/span><\/pre>\n
\"\u590d\u5236\u4ee3\u7801\"<\/a><\/span><\/div>\n<\/div>\n

\u4ece\u6d4b\u8bd5\u7ed3\u679c\u7ed3\u679c\u770b\u51fa\uff0c\u662f\u5e73\u5747\u5206\u914d\u7684\uff1a<\/p>\n

\n
\"\u590d\u5236\u4ee3\u7801\"<\/a><\/span><\/div>\n
partition = 1, offset = 416, key = null<\/span>, value = test9\npartition <\/span>= 0, offset = 386, key = null<\/span>, value = test10\npartition <\/span>= 2, offset = 454, key = null<\/span>, value = test11\npartition <\/span>= 1, offset = 417, key = null<\/span>, value = test12\npartition <\/span>= 0, offset = 387, key = null<\/span>, value = test13\npartition <\/span>= 2, offset = 455, key = null<\/span>, value = test14\npartition <\/span>= 1, offset = 418, key = null<\/span>, value = test15\npartition <\/span>= 0, offset = 388, key = null<\/span>, value = test16<\/pre>\n
\"\u590d\u5236\u4ee3\u7801\"<\/a><\/span><\/div>\n<\/div>\n

<\/p>\n

\u5982\u679c\u60f3\u8981\u63a7\u5236\u53d1\u9001\u7684partition\uff0c\u5219\u6709\u4e24\u79cd\u65b9\u5f0f\uff0c\u4e00\u79cd\u662f\u6307\u5b9apartition\uff0c\u53e6\u4e00\u79cd\u5c31\u662f\u6839\u636eKey\u81ea\u5df1\u5199\u7b97\u6cd5\u3002\u7ee7\u627fPartitioner\u63a5\u53e3\uff0c\u5b9e\u73b0\u5176partition\u65b9\u6cd5\u3002\u5e76\u4e14\u914d\u7f6e\u542f\u52a8\u53c2\u6570<\/p>\n

props.put(\"partitioner.class\",\"TestPartitioner\")\u3002<\/pre>\n

\u6bd4\u5982\u9700\u8981\u5b9e\u73b0<\/p>\n

key=\u2019aaa\u2019 \u7684\u90fd\u8fdbpartition 0<\/p>\n

key=\u2019bbb\u2019 \u7684\u90fd\u8fdbpartition 1<\/p>\n

key=\u2019bbb\u2019 \u7684\u90fd\u8fdbpartition 2<\/p>\n

<\/p>\n

\n
\"\u590d\u5236\u4ee3\u7801\"<\/a><\/span><\/div>\n
public<\/span> class<\/span> TestPartitioner implements<\/span> Partitioner {\n    <\/span>public<\/span> int<\/span> partition(String s, Object key, byte<\/span>[] bytes, Object o1, byte<\/span>[] bytes1, Cluster cluster) {\n        <\/span>if<\/span> (key.toString().equals(\"aaa\"))\n            <\/span>return<\/span> 0;\n        <\/span>else<\/span> if<\/span> (key.toString().equals(\"bbb\"))\n            <\/span>return<\/span> 1;\n        <\/span>else<\/span> if<\/span> (key.toString().equals(\"ccc\"))\n            <\/span>return<\/span> 2;\n        <\/span>else<\/span> return<\/span> 0;\n    }\n\n    <\/span>public<\/span> void<\/span> close() {\n\n    }\n\n    <\/span>public<\/span> void<\/span> configure(Map<String, > map) {\n\n    }\n}<\/span><\/pre>\n
\"\u590d\u5236\u4ee3\u7801\"<\/a><\/span><\/div>\n<\/div>\n

\u6d4b\u8bd5\u7ed3\u679c\uff1a<\/p>\n

\n
\"\u590d\u5236\u4ee3\u7801\"<\/a><\/span><\/div>\n
partition = 0, offset = 438, key = aaa, value = test32\npartition <\/span>= 1, offset = 448, key = bbb, value = test33\npartition <\/span>= 2, offset = 486, key = ccc, value = test34\npartition <\/span>= 0, offset = 439, key = aaa, value = test35\npartition <\/span>= 1, offset = 449, key = bbb, value = test36\npartition <\/span>= 2, offset = 487, key = ccc, value = test37\npartition <\/span>= 0, offset = 440, key = aaa, value = test38\npartition <\/span>= 1, offset = 450, key = bbb, value = test39\npartition <\/span>= 2, offset = 488, key = ccc, value = test40\npartition <\/span>= 0, offset = 441, key = aaa, value = test41\npartition <\/span>= 1, offset = 451, key = bbb, value = test42\npartition <\/span>= 2, offset = 489, key = ccc, value = test43\npartition <\/span>= 0, offset = 442, key = aaa, value = test44<\/pre>\n
\"\u590d\u5236\u4ee3\u7801\"<\/a><\/span><\/div>\n<\/div>\n

<\/p>\n

\u5982\u679c\u4f60\u4f7f\u7528\u7684\u4e0d\u662fJava\u7684\u5ba2\u6237\u7aef\uff0c\u662fjava<\/a>api\u4e0b\u9762\u7684Producer\u7684\u8bdd\uff0c\u81ea\u5b9a\u4e49\u7684\u5206\u533a\u7c7b\u9700\u8981\u5b9e\u73b0kafka.producer.Partitioner\uff0c\u5e76\u4e14\u6709\u6784\u9020\u51fd\u6570\u3002<\/p>\n

\n
\"\u590d\u5236\u4ee3\u7801\"<\/a><\/span><\/div>\n
public<\/span> class<\/span> TestPartitioner implements<\/span> Partitioner {\n    <\/span>public<\/span> TestPartitioner (VerifiableProperties props) {\n\n    }\n    <\/span>public<\/span> int<\/span> partition(Object o, int<\/span> i) {\n        <\/span>if<\/span> (o.toString().equals(\"aaa\"))\n            <\/span>return<\/span> 0;\n        <\/span>else<\/span> if<\/span> (o.toString().equals(\"bbb\"))\n            <\/span>return<\/span> 1;\n        <\/span>else<\/span> if<\/span> (o.toString().equals(\"ccc\"))\n            <\/span>return<\/span> 2;\n        <\/span>else<\/span> return<\/span> 0;\n    }\n}<\/span><\/pre>\n
\"\u590d\u5236\u4ee3\u7801\"<\/a><\/span><\/div>\n<\/div>\n<\/div>\n <\/div>\n <\/div>","orderid":"0","title":"kafka   Partition\u5206\u53d1\u7b56\u7565","smalltitle":"","mid":"0","fname":"Kafka","special_id":"0","bak_id":"0","info":"0","hits":"869","pages":"1","comments":"0","posttime":"2019-05-16 02:25:45","list":"1557944745","username":"admin","author":"","copyfrom":"","copyfromurl":"","titlecolor":"","fonttype":"0","titleicon":"0","picurl":"http:\/\/common.cnblogs.com\/images\/copycode.gif","ispic":"1","yz":"1","yzer":"","yztime":"0","levels":"0","levelstime":"0","keywords":"kafka<\/A>  <\/A> Partition<\/A> \u5206\u53d1<\/A> \u7b56\u7565<\/A>","jumpurl":"","iframeurl":"","style":"","template":"a:3:{s:4:\"foot\";s:0:\"\";s:8:\"bencandy\";s:0:\"\";s:4:\"head\";s:0:\"\";}","target":"0","ip":"47.106.78.186","lastfid":"0","money":"0","buyuser":"","passwd":"","allowdown":"","allowview":"","editer":"","edittime":"0","begintime":"0","endtime":"0","description":" kafka\u7248\u672c0.8.2.1Java\u5ba2\u6237\u7aef\u7248\u672c0.9.0.0\u4e3a\u4e86\u66f4\u597d\u7684\u5b9e\u73b0\u8d1f\u8f7d\u5747\u8861\u548c\u6d88\u606f\u7684\u987a\u5e8f\u6027\uff0cKafka Producer\u53ef\u4ee5\u901a\u8fc7\u5206\u53d1\u7b56\u7565\u53d1\u9001\u7ed9\u6307\u5b9a\u7684Partition\u3002Kafka\u4fdd\u8bc1\u5728partition\u4e2d\u7684\u6d88\u606f\u662f\u6709\u5e8f\u7684\u3002Kafka Java\u5ba2\u6237\u7aef..","lastview":"1714120790","digg_num":"899","digg_time":"1713985086","forbidcomment":"0","ifvote":"0","heart":"","htmlname":"","city_id":"0"},"page":"1"}