Java >> Java tutorial >  >> Java

Parallel udførelse af blokeringsopgaver med RxJava og Completable

" Hvordan parallel udførelse af blokering af "kun bivirkning" (alias ugyldige) opgaver blev lettere med Completable abstraktion introduceret i RxJava 1.1.1. "

Som du måske har bemærket ved at læse min blog, har jeg primært specialiseret mig i softwarehåndværk og automatisk kodetest. Men derudover er jeg entusiast for Continuous Delivery og bredt defineret samtidighed. Det sidste punkt spænder fra rene tråde og semaforer i C til mere højniveauløsninger som ReactiveX og skuespillermodellen. Denne gang en use case for en meget praktisk (i specifikke tilfælde) funktion introduceret i den helt nye RxJava 1.1.1 – rx.Completable . På samme måde som mange mine blogindlæg er denne også en afspejling af den faktiske begivenhed, jeg stødte på, når jeg arbejdede med rigtige opgaver og use cases.

En opgave at udføre

Forestil dig et system med ret kompleks behandling af asynkrone hændelser, der kommer fra forskellige kilder. Filtrering, sammenlægning, transformation, gruppering, berigelse og mere. RxJava passer meget godt her, især hvis vi vil være reaktive. Lad os antage, at vi allerede har implementeret det (ser ud og fungerer godt), og der er kun en ting mere tilbage. Inden vi starter behandlingen, er det påkrævet at fortælle 3 eksterne systemer, at vi er klar til at modtage beskeder. 3 synkrone opkald til ældre systemer (via RMI, JMX eller SOAP). Hver af dem kan vare i et antal sekunder, og vi skal vente på dem alle, før vi starter. Heldigvis er de allerede implementeret, og vi behandler dem som sorte kasser, der kan lykkes (eller mislykkes med en undtagelse). Vi skal bare ringe til dem (helst samtidig) og vente på, at de er færdige.

rx.Observable – tilgang 1

Med RxJava lige ved hånden, ligner det den indlysende tilgang. For det første kan jobudførelse ombrydes med Observable :

private Observable<Void> rxJobExecute(Job job) {
    return Observable.fromCallable(() -> { 
        job.execute();
        return null;
    });
}

Desværre (i vores tilfælde) Observable forventer at få nogle elementer returneret. Vi skal bruge Void og akavet return null (i stedet for blot metodehenvisning job::execute .

Dernæst kan vi bruge subscribeOn() metode til at bruge en anden tråd til at udføre vores job (og ikke blokere den primære/aktuelle tråd – vi ønsker ikke at udføre vores job sekventielt). Schedulers.io() giver en skemalægger et sæt tråde beregnet til IO-bundet arbejde.

Observable<Void> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io());
Observable<Void> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io());

Til sidst skal vi vente på, at de alle er færdige (alle Obvervable s at fuldføre). For at gøre det kan en zip-funktion tilpasses. Den kombinerer elementer udsendt af zippet Obserbable s efter deres sekvensnummer. I vores tilfælde er vi kun interesseret i det første pseudo-element fra hvert job Observable (vi udsender kun null for at tilfredsstille API) og vent på dem på en blokerende måde. En zip-funktion i en zip-operator skal returnere noget, og derfor skal vi gentage en løsning med null .

Observable.zip(run1, run2, (r1, r2) -> return null)
         .toBlocking()
         .single();

Det er ret synligt, at Observable blev designet til at arbejde med strømme af værdier, og der er noget ekstra arbejde, der kræves for at justere det til operationer, der kun giver bivirkninger (ikke returnerer noget). Situationen bliver endnu værre, når vi bliver nødt til at kombinere (f.eks. flette) vores operation, der kun har bivirkninger, med andre, der returnerer nogle værdi(er) – en grimmere rollebesætning er påkrævet. Se den virkelige use case fra RxNetty API.

public void execute() {
    Observable<Void> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io());
    Observable<Void> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io());

    Observable.zip(run1, run2, (r1, r2) -> null)
        .toBlocking()
        .single();
}

private Observable<Void> rxJobExecute(Job job) {
    return Observable.fromCallable(() -> { 
        job.execute();
        return null;
    });
}

rx.Observable – approach 2

Der kunne bruges en anden tilgang. I stedet for at generere en kunstig genstand, en tom Observable med vores opgave kan udføres som en onComplete handling. Dette tvinger os til at skifte fra zip operation til merge . Som et resultat skal vi angive en onNext handling (som aldrig udføres for tom Observable ), hvilket bekræfter os i overbevisning, at vi forsøger at hacke systemet.

public void execute() {
    Observable<Object> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io());
    Observable<Object> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io());

    Observable.merge(run1, run2)
            .toBlocking()
            .subscribe(next -> {});
}

private Observable<Object> rxJobExecute(Job job) {
    return Observable.empty()
            .doOnCompleted(job::execute);
}

rx.Completable

Bedre understøttelse af Observable, der ikke returnerer nogen værdi, er blevet behandlet i RxJava 1.1.1. Completable kan betragtes som en strippet version af Observable som enten kan afsluttes med succes (onCompleted hændelse udsendes) eller mislykkes (onError ). Den nemmeste måde at oprette en Completable på instans bruger en fromAction metode, som tager en Action0 som ikke returnerer nogen værdi (såsom Runnable ).

Completable completable1 = Completable.fromAction(job1::execute)
        .subscribeOn(Schedulers.io());
Completable completable2 = Completable.fromAction(job2::execute)
        .subscribeOn(Schedulers.io());

Dernæst kan vi bruge merge() metode, som returnerer en Completable instans, der abonnerer på alle downstream Completable s på én gang og afsluttes, når dem alle er færdige (eller en af ​​dem mislykkes). Som vi brugte subscribeOn metode med en ekstern planlægger udføres alle job parallelt (i forskellige tråde).

Completable.merge(completable1, completable2)
        .await();

await() metodeblokeringer, indtil alle opgaver er afsluttet (i tilfælde af fejl vil en undtagelse blive gendannet). Rent og enkelt.

public void execute() {
    Completable completable1 = Completable.fromAction(job1::execute)
            .subscribeOn(Schedulers.io());
    Completable completable2 = Completable.fromAction(job2::execute)
            .subscribeOn(Schedulers.io());

    Completable.merge(completable1, completable2)
        .await();
}

java.util.concurrent.CompletableFuture

Nogle af jer kunne spørge:Hvorfor ikke bare bruge CompletableFuture ? Det ville være et godt spørgsmål. Hvorimod ren Future introduceret i Java 5 kan det kræve yderligere arbejde fra vores side, ListenableFuture (fra Guava) og CompletableFuture (fra Java 8) gør det ret trivielt.

Først skal vi køre/planlægge udførelse af vores job. Brug derefter CompletableFuture.allOf() metode kan vi oprette en ny CompletableFuture som er afsluttet i det øjeblik, alle job er færdige (har vi ikke set den opfattelse før?). get() metode blokerer bare for at vente på det.

public void execute() {
    try {
        CompletableFuture<Void> run1 = CompletableFuture.runAsync(job1::execute);
        CompletableFuture<Void> run2 = CompletableFuture.runAsync(job2::execute);

        CompletableFuture.allOf(run1, run2)
            .get();

    } catch (InterruptedException | ExecutionException e) {
        throw new RuntimeException("Jobs execution failed", e);
    }
}

Vi skal gøre noget med kontrollerede undtagelser (meget ofte ønsker vi ikke at forurene vores API med dem), men generelt ser det fornuftigt ud. Det er dog værd at huske at CompletableFuture kommer til kort, når mere kompleks kædebehandling er påkrævet. Ud over at have RxJava allerede brugt i vores projekt er det ofte nyttigt at bruge den samme (eller lignende) API i stedet for at introducere noget helt nyt.

Oversigt

Takket være rx.Completable en udførelse af opgaver med kun bivirkning (ikke returnerer noget) med RxJava er meget mere behagelig. I codebase, der allerede bruger RxJava, kunne det være en foretrukket frem for CompletableFuture selv i simple tilfælde. Dog Completable giver mange flere avancerede operatører og teknikker og kan desuden nemt blandes med Observable hvad der gør den endnu mere kraftfuld.

For at læse mere om Completable du vil måske se udgivelsesbemærkningerne. For dem, der ønsker at have en dybere indsigt i emnet, er der en meget detaljeret introduktion til Completable API på Advanced RxJava blog (del 1 og 2).

  • Kildekoden til kodeeksempler er tilgængelig fra GitHub.

Btw, hvis du er interesseret i RxJava generelt, kan jeg med god samvittighed anbefale dig en bog, som er ved at blive skrevet af Tomasz Nurkiewicz og Ben Christensen – Reactive Programming with RxJava.

Java tag