Java >> Java tutorial >  >> Tag >> Netty

Få adgang til Meetups streaming-API med RxNetty

Denne artikel vil berøre flere emner:reaktiv programmering, HTTP, parsing af JSON og integration med social API. Alt i én brugssituation:vi indlæser og behandler nye meetup.com-begivenheder i realtid via ikke-blokerende RxNetty-bibliotek, der kombinerer kraften fra Netty-rammeværket og fleksibiliteten i RxJava-biblioteket. Meetup leverer offentligt tilgængelig streaming API, der skubber hver eneste Meetup, der er registreret over hele verden, i realtid. Bare browse til stream.meetup.com/2/open_events og observer, hvordan bidder af JSON langsomt dukker op på din skærm. Hver gang nogen opretter en ny begivenhed, skubbes selvstændig JSON fra serveren til din browser. Det betyder, at en sådan anmodning aldrig slutter, i stedet bliver vi ved med at modtage delvise data, så længe vi ønsker. Vi har allerede undersøgt lignende scenarie i Turning Twitter4J into RxJava's Observable . Hver ny mødebegivenhed udgiver et selvstændigt JSON-dokument, der ligner dette (mange detaljer udeladt):

{ "id" : "219088449",
  "name" : "Silver Wings Brunch",
  "time" : 1421609400000,
  "mtime" : 1417814004321,
  "duration" : 900000,
  "rsvp_limit" : 0,
  "status" : "upcoming",
  "event_url" : "http://www.meetup.com/Laguna-Niguel-Social-Networking-Meetup/events/219088449/",
  "group" : { "name" : "Former Flight Attendants South Orange and North San Diego Co",
              "state" : "CA"
              ...
  },
  "venue" : { "address_1" : "26860 Ortega Highway",
              "city" : "San Juan Capistrano",
              "country" : "US"
              ...
  },
  "venue_visibility" : "public",
  "visibility" : "public",
  "yes_rsvp_count" : 1
  ...
}

Hver gang vores lange polling HTTP-forbindelse (med Transfer-Encoding: chunked respons header) skubber et sådant stykke JSON, vi vil parse det og på en eller anden måde sende videre. Vi hader tilbagekald, så RxJava virker som et rimeligt alternativ (tænk:Observable<Event> ).

Trin 1:Modtagelse af rådata med RxNetty

Vi kan ikke bruge almindelige HTTP-klienter, da de er fokuseret på request-response semantik. Der er intet svar her, vi forlader simpelthen en åben forbindelse for evigt og forbruger data, når det kommer. RxJava har et "out-of-the-box" RxApacheHttp-bibliotek, men det antager text/event-stream indholdstype. I stedet vil vi bruge et meget lavt, alsidigt RxNetty-bibliotek. Det er en indpakning omkring Netty (duh!) og er i stand til at implementere vilkårlige TCP/IP (inklusive HTTP) og UDP-klienter og -servere. Hvis du ikke kender Netty, er det pakke- snarere end stream-orienteret, så vi kan forvente en Netty-begivenhed for hvert Meetup-push. API'et er bestemt ikke ligetil, men det giver mening, når du først har gjort det:

HttpClient<ByteBuf, ByteBuf> httpClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder("stream.meetup.com", 443)
        .pipelineConfigurator(new HttpClientPipelineConfigurator<>())
        .withSslEngineFactory(DefaultFactories.trustAll())
        .build();
 
final Observable<HttpClientResponse> responses = 
    httpClient.submit(HttpClientRequest.createGet("/2/open_events"));
final Observable byteBufs = 
    responses.flatMap(AbstractHttpContentHolder::getContent);
final Observable chunks = 
    byteBufs.map(content -> content.toString(StandardCharsets.UTF_8));

Først opretter vi HttpClient og opsæt SSL (husk at trustAll() med hensyn til servercertifikater er nok ikke den bedste produktionsindstilling). Senere visubmit() GET-anmodning og modtag Observable<HttpClientResponse<ByteBuf>> til gengæld. ByteBuf er Nettys abstraktion over en masse bytes sendt eller modtaget over ledningen. Denne observerbare vil straks fortælle os om hvert stykke data modtaget fra Meetup. Efter at have udtrukket ByteBuf fra svar gør vi det til en String indeholdende førnævnte JSON. Så langt så godt, det virker.

Trin 2:Justering af pakker med JSON-dokumenter

Netty er meget kraftfuld, fordi den ikke skjuler iboende kompleksitet over utætte abstraktioner. Hver gang noget modtages over TCP/IP-ledningen, får vi besked. Du tror måske, at når serveren sender 100 bytes, vil Netty på klientsiden give os besked om disse modtagne 100 bytes. TCP/IP-stak er dog fri til at opdele og flette data, du sender over tråd, især da det formodes at være en stream, så hvordan det er opdelt i pakker burde være irrelevant. Denne advarsel er meget forklaret i Nettys dokumentation. Hvad betyder det for os? Når Meetup sender en enkelt begivenhed, modtager vi muligvis kun én String i chunks observerbar. Men lige så godt kan det opdeles i vilkårligt antal pakker, således chunks vil udsende flere String s. Endnu værre, hvis Meetup sender to begivenheder lige efter hinanden, kan de passe i én pakke. I så faldchunks vil udsende én String med to uafhængige JSON-dokumenter. Faktisk kan vi ikke antage nogen tilpasning mellem JSON-strenge og modtagede netværkspakker. Alt, hvad vi ved, er, at individuelle JSON-dokumenter, der repræsenterer begivenheder, er adskilt af nye linjer. Utroligt nok, RxJavaString officiel tilføjelse har en metode til netop det:

Observable jsonChunks = StringObservable.split(chunks, "\n");

Faktisk er der endnu enklere StringObservable.byLine(chunks) , men den bruger platform-afhængig end-of-line. Hvad split() gør er bedst forklaret i officiel dokumentation:

Nu kan vi sikkert parse hver String udsendes af jsonChunks :

Trin 3:Parsing af JSON

Interessant nok er dette trin ikke så ligetil. Jeg indrømmer, jeg en slags nød WSDL-tider, fordi jeg nemt og forudsigeligt kunne generere Java-model, der følger web-services kontrakt. JSON, især når man tager JSON-skemaets marginale markedspenetration, er dybest set integrationens vilde vesten. Typisk står du tilbage med uformel dokumentation eller prøver af anmodninger og svar. Ingen typeoplysninger eller format, om felter er obligatoriske osv. Desuden fordi jeg modvilligt arbejder med kort over kort (hej med Clojure-programmører), for at kunne arbejde med JSON-baserede REST-tjenester skal jeg selv skrive kortlægnings-POJO'er. Nå, der er løsninger. Først tog jeg et repræsentativt eksempel på JSON produceret af Meetup streaming API og placerede det i src/main/json/meetup/event.json . Så brugte jeg jsonschema2pojo-maven-plugin (Gradle og Ant versioner findes også). Plugin's navn er forvirrende, det kan også fungere med JSON-eksempel, ikke kun skema, til at producere Java-modeller:

<plugin>
    <groupId>org.jsonschema2pojo</groupId>
    <artifactId>jsonschema2pojo-maven-plugin</artifactId>
    <version>0.4.7</version>
    <configuration>
        <sourceDirectory>${basedir}/src/main/json/meetup</sourceDirectory>
        <targetPackage>com.nurkiewicz.meetup.generated</targetPackage>
        <includeHashcodeAndEquals>true</includeHashcodeAndEquals>
        <includeToString>true</includeToString>
        <initializeCollections>true</initializeCollections>
        <sourceType>JSON</sourceType>
        <useCommonsLang3>true</useCommonsLang3>
        <useJodaDates>true</useJodaDates>
        <useLongIntegers>true</useLongIntegers>
        <outputDirectory>target/generated-sources</outputDirectory>
    </configuration>
    <executions>
        <execution>
            <id>generate-sources</id>
            <phase>generate-sources</phase>
            <goals>
                <goal>generate</goal>
            </goals>
        </execution>
    </executions>
</plugin>

På dette tidspunkt vil Maven oprette Event.java , Venue.java , Group.java , osv. kompatibel med Jackson:

private Event parseEventJson(String jsonStr) {
    try {
        return objectMapper.readValue(jsonStr, Event.class);
    } catch (IOException e) {
        throw new UncheckedIOException(e);
    }
}

Det virker bare, søde:

final Observable events = jsonChunks.map(this::parseEventJson);

Trin 4:??? [1]

Trin 5:PROFIT!!!

Med Observable<Event> vi kan implementere nogle virkelig interessante use cases. Vil du finde navne på alle møder i Polen, der lige er blevet oprettet? Klart!

events
        .filter(event -> event.getVenue() != null)
        .filter(event -> event.getVenue().getCountry().equals("pl"))
        .map(Event::getName)
        .forEach(System.out::println);

Leder du efter statistik, hvor mange begivenheder oprettes i minuttet? Intet problem!

events
        .buffer(1, TimeUnit.MINUTES)
        .map(List::size)
        .forEach(count -> log.info("Count: {}", count));

Eller måske vil du løbende søge efter møder længst ude i fremtiden og springe dem tættere på end dem, der allerede er fundet?

events
        .filter(event -> event.getTime() != null)
        .scan(this::laterEventFrom)
        .distinct()
        .map(Event::getTime)
        .map(Instant::ofEpochMilli)
        .forEach(System.out::println);
 
//...
 
private Event laterEventFrom(Event first, Event second) {
    return first.getTime() > second.getTime() ?
            first :
            second;
}

Denne kode bortfiltrerer hændelser uden kendt tid, udsender enten nuværende hændelse eller den forrige (scan() ), afhængigt af hvilken der var senere, filtrerer dubletter fra og viser tid. Dette lille program, der kører i få minutter, har allerede fundet et netop oprettet møde, der er planlagt til november 2015 - og det er december 2014, når dette skrives. Mulighederne er uendelige.

Jeg håber, at jeg gav dig en god forståelse af, hvordan du nemt kan blande forskellige teknologier sammen:reaktiv programmering til at skrive superhurtig netværkskode, typesikker JSON-parsing uden kedelkode og RxJava til hurtigt at behandle strømme af begivenheder. God fornøjelse!

No
Java tag