Java >> Java tutorial >  >> Java

Brug reactive streams API til at kombinere akka-streams med rxJava

Bare en hurtig artikel denne gang, da jeg stadig eksperimenterer med disse ting. Der er meget snak omkring reaktiv programmering. I Java 8 har vi Stream API, vi har rxJava vi har ratpack og Akka har akka-streams.

Hovedproblemet med disse implementeringer er, at de ikke er kompatible. Du kan ikke forbinde abonnenten af ​​en implementering til udgiveren af ​​en anden. Heldigvis er der begyndt et initiativ for at give en måde, hvorpå disse forskellige implementeringer kan arbejde sammen:



"Det er hensigten med denne specifikation at tillade skabelsen af ​​mange overensstemmende implementeringer, som i kraft af at overholde reglerne vil være i stand til at fungere gnidningsløst og bevare de førnævnte fordele og egenskaber på tværs af hele behandlingsgrafen for en stream-applikation."

Fra – http://www.reactive-streams.org/

Hvordan fungerer dette

Hvordan gør vi det nu? Lad os se på et hurtigt eksempel baseret på akka-stream-eksemplerne (herfra). I følgende liste:

package sample.stream
 
import akka.actor.ActorSystem
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.{SubscriberSink, PublisherSource, Source}
import com.google.common.collect.{DiscreteDomain, ContiguousSet}
import rx.RxReactiveStreams
import rx.Observable;
import scala.collection.JavaConverters._
 
object BasicTransformation {
 
  def main(args: Array[String]): Unit = {
 
    // define an implicit actorsystem and import the implicit dispatcher
    implicit val system = ActorSystem("Sys")
    import system.dispatcher
 
    // flow materializer determines how the stream is realized.
    // this time as a flow between actors.
    implicit val materializer = FlowMaterializer()
 
    // input text for the stream.
    val text =
      """|Lorem Ipsum is simply dummy text of the printing and typesetting industry.
         |Lorem Ipsum has been the industry's standard dummy text ever since the 1500s, 
         |when an unknown printer took a galley of type and scrambled it to make a type 
         |specimen book.""".stripMargin
 
    // create an observable from a simple list (this is in rxjava style)
    val first = Observable.from(text.split("\\s").toList.asJava);
    // convert the rxJava observable to a publisher
    val publisher = RxReactiveStreams.toPublisher(first);
    // based on the publisher create an akka source
    val source = PublisherSource(publisher);
 
    // now use the akka style syntax to stream the data from the source
    // to the sink (in this case this is println)
    source.
      map(_.toUpperCase).                 // executed as actors
      filter(_.length > 3).
      foreach { el =>                     // the sink/consumer
        println(el)
      }.
      onComplete(_ => system.shutdown())  // lifecycle event
  }
}

Kodekommentarerne i dette eksempel forklarer stort set, hvad der sker. Det, vi gør her, er, at vi skaber en rxJava-baseret observerbar. Konverter denne observerbare til en "reactive streams"-udgiver, og brug denne udgiver til at oprette en akka-streams-kilde. For resten af ​​koden kan vi bruge akka-stream style flow API til at modellere strømmen. I dette tilfælde foretager vi bare noget filtrering og udskriver resultatet.

Java tag