Java >> Java tutorial >  >> Tag >> Netty

Rx-netty og Karyon2-baseret cloud-klar mikroservice

Netflix Karyon giver en ren ramme til at skabe cloud-klare mikrotjenester. Hvis du i din organisation bruger Netflix OSS-stakken bestående af Eureka til registrering og opdagelse af tjenester, Archaius til ejendomsadministration, så bruger du højst sandsynligt Karyon til at oprette dine mikrotjenester.

Karyon har gennemgået en del ændringer for nylig, og mit mål her er at dokumentere en god prøve ved hjælp af den nyere version af Karyon. Den gamle Karyon(kald det Karyon1) var baseret på JAX-RS 1.0 Specs med Jersey som implementering, den nyere version af Karyon(Karyon2) understøtter stadig Jersey, men opfordrer også til brugen af ​​RX-Netty, som er en tilpasset version af Netty med understøttelse af Rx-java.

Med det sagt, lad mig springe ind i en prøve. Mit mål med denne prøve er at skabe en "pong"-mikrotjeneste, som tager en "POST"-besked og returnerer en "Anerkendelse"

Følgende er en prøveanmodning:

{
"id": "id",
"payload":"Ping"
}

Og et forventet svar:

{"id":"id","received":"Ping","payload":"Pong"}

Det første trin er at oprette en RequestHandler, der, som navnet antyder, er en RX-Netty-komponent, der beskæftiger sig med at dirigere den indgående anmodning:

package org.bk.samplepong.app;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.http.server.RequestHandler;
import netflix.karyon.transport.http.health.HealthCheckEndpoint;
import org.bk.samplepong.domain.Message;
import org.bk.samplepong.domain.MessageAcknowledgement;
import rx.Observable;

import java.io.IOException;
import java.nio.charset.Charset;


public class RxNettyHandler implements RequestHandler<ByteBuf, ByteBuf> {

    private final String healthCheckUri;
    private final HealthCheckEndpoint healthCheckEndpoint;
    private final ObjectMapper objectMapper = new ObjectMapper();

    public RxNettyHandler(String healthCheckUri, HealthCheckEndpoint healthCheckEndpoint) {
        this.healthCheckUri = healthCheckUri;
        this.healthCheckEndpoint = healthCheckEndpoint;
    }

    @Override
    public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
        if (request.getUri().startsWith(healthCheckUri)) {
            return healthCheckEndpoint.handle(request, response);
        } else if (request.getUri().startsWith("/message") && request.getHttpMethod().equals(HttpMethod.POST)) {
            return request.getContent().map(byteBuf -> byteBuf.toString(Charset.forName("UTF-8")))
                    .map(s -> {
                        try {
                            Message m = objectMapper.readValue(s, Message.class);
                            return m;
                        } catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                    })
                    .map(m -> new MessageAcknowledgement(m.getId(), m.getPayload(), "Pong"))
                    .flatMap(ack -> {
                                try {
                                    return response.writeStringAndFlush(objectMapper.writeValueAsString(ack));
                                } catch (Exception e) {
                                    response.setStatus(HttpResponseStatus.BAD_REQUEST);
                                    return response.close();
                                }
                            }
                    );
        } else {
            response.setStatus(HttpResponseStatus.NOT_FOUND);
            return response.close();
        }
    }
}

Dette flow er fuldstændig asynkront og internt styret af RX-java-bibliotekerne, Java 8 Lambda-udtryk hjælper også med at gøre koden kortfattet. Det ene problem, du vil se her, er, at routing-logikken (hvilken uri til hvilken controller) er blandet sammen med den faktiske controller-logik, og jeg tror, ​​at dette bliver rettet.

Givet denne RequestHandler kan en server startes op i et selvstændigt java-program, ved at bruge rå RX-Netty på denne måde, dette er i bund og grund det, et slutpunkt vil blive hentet op på port 8080 for at håndtere anmodningerne:

public final class RxNettyExample {

    public static void main(String... args) throws Exception {
        final ObjectMapper objectMapper = new ObjectMapper();
        RxNettyHandler handler = new RxNettyHandler();

        HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(8080, handler);

        server.start();

Dette er imidlertid den native Rx-netty måde, for en cloud-klar mikrotjeneste skal der ske et par ting, tjenesten skal registreres hos Eureka og svare på sundhedstjekket tilbage fra Eureka og skulle være i stand til at indlæse egenskaber ved hjælp af Archaius .

Så med Karyon2 ser opstarten i et hovedprogram lidt anderledes ud:

package org.bk.samplepong.app;

import netflix.adminresources.resources.KaryonWebAdminModule;
import netflix.karyon.Karyon;
import netflix.karyon.KaryonBootstrapModule;
import netflix.karyon.ShutdownModule;
import netflix.karyon.archaius.ArchaiusBootstrapModule;
import netflix.karyon.eureka.KaryonEurekaModule;
import netflix.karyon.servo.KaryonServoModule;
import netflix.karyon.transport.http.health.HealthCheckEndpoint;
import org.bk.samplepong.resource.HealthCheck;

public class SamplePongApp {

    public static void main(String[] args) {
        HealthCheck healthCheckHandler = new HealthCheck();
        Karyon.forRequestHandler(8888,
                new RxNettyHandler("/healthcheck",
                        new HealthCheckEndpoint(healthCheckHandler)),
                new KaryonBootstrapModule(healthCheckHandler),
                new ArchaiusBootstrapModule("sample-pong"),
                KaryonEurekaModule.asBootstrapModule(),
                Karyon.toBootstrapModule(KaryonWebAdminModule.class),
                ShutdownModule.asBootstrapModule(),
                KaryonServoModule.asBootstrapModule()
        ).startAndWaitTillShutdown();
    }
}

Nu er det i det væsentlige klar til skyen, denne version af programmet ved opstart vil registreres rent hos Eureka og afsløre et sundhedstjek-slutpunkt. Det afslører desuden et pænt sæt admin-slutpunkter ved port 8077.

Konklusion

Jeg håber, at dette giver en god introduktion til at bruge Karyon2 til at udvikle Netflix OSS baseret. Hele prøven er tilgængelig på min github-repo her:https://github.com/bijukunjummen/sample-ping-pong-netflixoss/tree/master/sample-pong. Som en opfølgning vil jeg vise, hvordan den samme tjeneste kan udvikles ved hjælp af spring-cloud, som er forårets måde at skabe mikrotjenester på.


Java tag