public static void createKafaTopic() {
ZkUtils zkUtils = ZkUtils.apply(
"172.30.251.331:2181,172.30.251.341:2181", 30000, 30000,
JaasUtils.isZkSecurityEnabled());
Properties conf = new Properties();
AdminUtils.createTopic(zkUtils, bean.getTopic(),
bean.getPartition(), bean.getReplication(),
new Properties(), new RackAwareMode.Enforced$());
}
2: 实现kafka删除topic
private static ResultDTO deleteTopic(ZkUtils zkUtils2, KafkaTopicBean bean) {
ResultDTO DAO = null;
boolean isCheck = checkDeleteTopic(bean);
if (isCheck) {
if (Topic.isInternal(bean.getTopic())) {
DAO = new ResultDTO(
Global_Constant.DTO_ERROR_CODE,
"Topic %s is a kafka internal topic and is not allowed to be marked for deletion");
} else {
zkUtils.createPersistentPath(
zkUtils.getDeleteTopicPath(bean.getTopic()), null,
zkUtils.DefaultAcls());
DAO = new ResultDTO(Global_Constant.DTO_SUCCESS_CODE,
"success in delete topic");
}
} else {
DAO = new ResultDTO(Global_Constant.DTO_ERROR_CODE,
"delete topic but topic name is empty");
}
return DAO;