Java >> Java opplæring >  >> Tag >> Spring

RabbitMQ – Behandler meldinger serielt ved hjelp av Spring-integrering Java DSL

Hvis du noen gang har behov for å behandle meldinger serielt med RabbitMQ med en gruppe lyttere som behandler meldingene, er den beste måten jeg har sett på å bruke et "eksklusivt forbruker"-flagg på en lytter med 1 tråd på hver lytter som behandler meldingene.

Eksklusivt forbrukerflagg sikrer at kun 1 forbruker kan lese meldinger fra den spesifikke køen, og 1 tråd på den forbrukeren sørger for at meldingene behandles serielt. Det er imidlertid en hake, jeg skal gå over det senere.

La meg demonstrere denne oppførselen med en Spring Boot og Spring Integration-basert RabbitMQ-meldingsforbruker.

For det første er dette konfigurasjonen for å sette opp en kø med Spring java-konfigurasjon, merk at siden dette er en Spring Boot-applikasjon, oppretter den automatisk en RabbitMQ-tilkoblingsfabrikk når Spring-amqp-biblioteket legges til listen over avhengigheter:

@Configuration
@Configuration
public class RabbitConfig {

    @Autowired
    private ConnectionFactory rabbitConnectionFactory;

    @Bean
    public Queue sampleQueue() {
        return new Queue("sample.queue", true, false, false);
    }

}

Gitt denne prøvekøen, en lytter som får meldingene fra denne køen og behandler dem ser slik ut, flyten er skrevet ved hjelp av den utmerkede Spring-integreringen Java DSL bibliotek:

@Configuration
public class RabbitInboundFlow {
    private static final Logger logger = LoggerFactory.getLogger(RabbitInboundFlow.class);

    @Autowired
    private RabbitConfig rabbitConfig;

    @Autowired
    private ConnectionFactory connectionFactory;

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer() {
        SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
        listenerContainer.setConnectionFactory(this.connectionFactory);
        listenerContainer.setQueues(this.rabbitConfig.sampleQueue());
        listenerContainer.setConcurrentConsumers(1);
        listenerContainer.setExclusive(true);
        return listenerContainer;
    }

    @Bean
    public IntegrationFlow inboundFlow() {
        return IntegrationFlows.from(Amqp.inboundAdapter(simpleMessageListenerContainer()))
                .transform(Transformers.objectToString())
                .handle((m) -> {
                    logger.info("Processed  {}", m.getPayload());
                })
                .get();
    }

}

Flyten er veldig konsist uttrykt i inboundFlow-metoden, en meldingsnyttelast fra RabbitMQ transformeres fra byte-array til String og behandles til slutt ved å logge meldingen til loggene.

Den viktige delen av flyten er lytterkonfigurasjonen, legg merke til flagget som setter forbrukeren til å være en eksklusiv forbruker, og innenfor denne forbrukeren er antallet tråder som behandles satt til 1. Gitt dette selv om flere forekomster av applikasjonen bare startes opp 1 av lytterne vil kunne koble til og behandle meldinger.

Nå for fangsten, vurder et tilfelle der behandlingen av meldinger tar en stund å fullføre og ruller tilbake under behandlingen av meldingen. Hvis forekomsten av applikasjonen som håndterer meldingen skulle stoppes midt i behandlingen av en slik melding, så er oppførselen en annen instans vil begynne å håndtere meldingene i køen, når den stoppede forekomsten ruller tilbake meldingen, rullet tilbake meldingen blir deretter levert til den nye eksklusive forbrukeren, og får dermed en melding ute av drift.

  • Hvis du er interessert i å utforske dette videre, her er et github-prosjekt for å leke med denne funksjonen:https://github.com/bijukunjummen/test-rabbit-exclusive.

Java Tag