Properties props =newProperties();
props.put("bootstrap.servers","localhost:9092");
props.put("transactional.id","my-transactional-id");
Producer<String, String> producer =newKafkaProducer<>(props,newStringSerializer(),newStringSerializer());
producer.initTransactions();try{
producer.beginTransaction();for(int i =0; i <100; i++)
producer.send(newProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.commitTransaction();}catch(ProducerFencedException| OutOfOrderSequenceException | AuthorizationException e){// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();}catch(KafkaException e){// For all other exceptions, just abort the transaction and try again.
producer.abortTransaction();}
producer.close();