Java >> Java-zelfstudie >  >> Tag >> JUnit

Kafka gebruiken met Junit

Een van de handige functies die het uitstekende Spring Kafka-project biedt, afgezien van een gemakkelijker te gebruiken abstractie over onbewerkte Kafka Producer en Consumer, is een manier om Kafka in tests te gebruiken. Het doet dit door een ingebouwde versie van Kafka te bieden die heel gemakkelijk kan worden opgezet en afgebroken.

Het enige dat een project nodig heeft om deze ondersteuning te bieden, is de module "spring-kafka-test", voor een geleidelijke opbouw op de volgende manier:

testCompile "org.springframework.kafka:spring-kafka-test:1.1.2.BUILD-SNAPSHOT"

Merk op dat ik een snapshot-versie van het project gebruik, omdat deze ondersteuning biedt voor Kafka 0.10+.

Met deze afhankelijkheid kan een Embedded Kafka worden gebruikt in een test met behulp van de @ClassRule van JUnit:

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(2, true, 2, "messages");

Dit zou een Kafka-cluster starten met 2 makelaars, met een onderwerp genaamd "berichten" met 2 partities en de klassenregel zou ervoor zorgen dat een Kafka-cluster wordt geactiveerd voordat de tests worden uitgevoerd en vervolgens wordt afgesloten aan het einde ervan.

Hier is hoe een voorbeeld met Raw Kafka Producer/Consumer eruitziet met behulp van dit ingebedde Kafka-cluster. De embedded Kafka kan worden gebruikt voor het ophalen van de eigenschappen die vereist zijn door de Kafka Producer/Consumer:

Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
producer.send(new ProducerRecord<>("messages", 0, 0, "message0")).get();
producer.send(new ProducerRecord<>("messages", 0, 1, "message1")).get();
producer.send(new ProducerRecord<>("messages", 1, 2, "message2")).get();
producer.send(new ProducerRecord<>("messages", 1, 3, "message3")).get();


Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("sampleRawConsumer", "false", embeddedKafka);
consumerProps.put("auto.offset.reset", "earliest");

final CountDownLatch latch = new CountDownLatch(4);
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(() -> {
    KafkaConsumer<Integer, String> kafkaConsumer = new KafkaConsumer<>(consumerProps);
    kafkaConsumer.subscribe(Collections.singletonList("messages"));
    try {
        while (true) {
            ConsumerRecords<Integer, String> records = kafkaConsumer.poll(100);
            for (ConsumerRecord<Integer, String> record : records) {
                LOGGER.info("consuming from topic = {}, partition = {}, offset = {}, key = {}, value = {}",
                        record.topic(), record.partition(), record.offset(), record.key(), record.value());
                latch.countDown();
            }
        }
    } finally {
        kafkaConsumer.close();
    }
});

assertThat(latch.await(90, TimeUnit.SECONDS)).isTrue();

Een iets uitgebreidere test is hier beschikbaar

Java-tag