RabbitMQ – Behandler meldinger serielt ved hjelp av Spring-integrering Java DSL
Hvis du noen gang har behov for å behandle meldinger serielt med RabbitMQ med en gruppe lyttere som behandler meldingene, er den beste måten jeg har sett på å bruke et "eksklusivt forbruker"-flagg på en lytter med 1 tråd på hver lytter som behandler meldingene.
Eksklusivt forbrukerflagg sikrer at kun 1 forbruker kan lese meldinger fra den spesifikke køen, og 1 tråd på den forbrukeren sørger for at meldingene behandles serielt. Det er imidlertid en hake, jeg skal gå over det senere.
La meg demonstrere denne oppførselen med en Spring Boot og Spring Integration-basert RabbitMQ-meldingsforbruker.
For det første er dette konfigurasjonen for å sette opp en kø med Spring java-konfigurasjon, merk at siden dette er en Spring Boot-applikasjon, oppretter den automatisk en RabbitMQ-tilkoblingsfabrikk når Spring-amqp-biblioteket legges til listen over avhengigheter:
@Configuration @Configuration public class RabbitConfig { @Autowired private ConnectionFactory rabbitConnectionFactory; @Bean public Queue sampleQueue() { return new Queue("sample.queue", true, false, false); } }
Gitt denne prøvekøen, en lytter som får meldingene fra denne køen og behandler dem ser slik ut, flyten er skrevet ved hjelp av den utmerkede Spring-integreringen Java DSL bibliotek:
@Configuration public class RabbitInboundFlow { private static final Logger logger = LoggerFactory.getLogger(RabbitInboundFlow.class); @Autowired private RabbitConfig rabbitConfig; @Autowired private ConnectionFactory connectionFactory; @Bean public SimpleMessageListenerContainer simpleMessageListenerContainer() { SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer(); listenerContainer.setConnectionFactory(this.connectionFactory); listenerContainer.setQueues(this.rabbitConfig.sampleQueue()); listenerContainer.setConcurrentConsumers(1); listenerContainer.setExclusive(true); return listenerContainer; } @Bean public IntegrationFlow inboundFlow() { return IntegrationFlows.from(Amqp.inboundAdapter(simpleMessageListenerContainer())) .transform(Transformers.objectToString()) .handle((m) -> { logger.info("Processed {}", m.getPayload()); }) .get(); } }
Flyten er veldig konsist uttrykt i inboundFlow-metoden, en meldingsnyttelast fra RabbitMQ transformeres fra byte-array til String og behandles til slutt ved å logge meldingen til loggene.
Den viktige delen av flyten er lytterkonfigurasjonen, legg merke til flagget som setter forbrukeren til å være en eksklusiv forbruker, og innenfor denne forbrukeren er antallet tråder som behandles satt til 1. Gitt dette selv om flere forekomster av applikasjonen bare startes opp 1 av lytterne vil kunne koble til og behandle meldinger.
Nå for fangsten, vurder et tilfelle der behandlingen av meldinger tar en stund å fullføre og ruller tilbake under behandlingen av meldingen. Hvis forekomsten av applikasjonen som håndterer meldingen skulle stoppes midt i behandlingen av en slik melding, så er oppførselen en annen instans vil begynne å håndtere meldingene i køen, når den stoppede forekomsten ruller tilbake meldingen, rullet tilbake meldingen blir deretter levert til den nye eksklusive forbrukeren, og får dermed en melding ute av drift.
- Hvis du er interessert i å utforske dette videre, her er et github-prosjekt for å leke med denne funksjonen:https://github.com/bijukunjummen/test-rabbit-exclusive.