Java >> Java tutorial >  >> Java

Java EE 7 Batch Processing og World of Warcraft – Del 1

Dette var en af ​​mine sessioner ved den sidste JavaOne. Dette indlæg vil udvide emnet og se på en rigtig applikation ved hjælp af Batch JSR-352 API. Denne applikation integreres med MMORPG World of Warcraft.

Da JSR-352 er en ny specifikation i Java EE-verdenen, tror jeg, at mange mennesker ikke ved, hvordan man bruger den korrekt. Det kan også være en udfordring at identificere de use cases, som denne specifikation gælder for. Forhåbentlig kan dette eksempel hjælpe dig til bedre at forstå brugstilfældene.


Abstrakt

World of Warcraft er et spil, der spilles af mere end 8 millioner spillere verden over. Tjenesten tilbydes efter region:USA (US) , Europa (EU) , Kina og Korea. Hver region har et sæt servere kaldet Realm som du bruger til at forbinde for at kunne spille spillet. For dette eksempel ser vi kun ind i USA og EU regioner.

En af de mest interessante funktioner ved spillet er, at det giver dig mulighed for at købe og sælge varer i spillet kaldet Items , ved hjælp af et Auktionshus . Hvert rige har to Auktionshus . I gennemsnit hvert rige handler omkring 70.000 Elementer . Lad os knuse nogle tal:

  • 512 Realm's (USA og EU )
  • 70 K Vare pr. Rige
  • Mere end 35 M Vare samlet

Dataene

En anden cool ting ved World of Warcraft er, at udviklerne leverer en REST API for at få adgang til det meste af informationen i spillet, inklusive Auktionshusets data. Tjek hele API'et her.

Auktionshusets data indhentes i to trin. Først skal vi forespørge korrespondenten Auktionshuset Rige REST-slutpunkt for at få en reference til en JSON-fil. Dernæst skal vi få adgang til denne URL og downloade filen med alle Auktionshuset Vare Information. Her er et eksempel:

http://eu.battle.net/api/wow/auction/data/aggra-portugues

Applikationen

Vores mål her er at bygge en applikation, der downloader Auktionshusets , behandle det og udtrække metrics. Disse metrics kommer til at opbygge en historie for Elementerne prisudvikling gennem tiden. Hvem ved? Måske kan vi med disse oplysninger forudsige prisudsving og købe eller sælge varer på de bedste tidspunkter.

Opsætningen

Til opsætningen skal vi bruge et par ekstra ting til Java EE 7:

  • Java EE 7
  • Angular JS
  • Angular ng-gitter
  • UI Bootstrap
  • Google-diagram
  • Vildflue

Jobs

Det vigtigste arbejde, det vil blive udført af Batch JSR-352 Jobs. Et job er en enhed, der indkapsler en hel batchproces. Et job vil blive forbundet via et jobspecifikationssprog. Med JSR-352 er et job blot en beholder til trinene. Den kombinerer flere trin, der logisk hører sammen i et flow.

Vi vil opdele virksomhedens login i tre opgaver:

  • Forbered – Opretter alle de nødvendige understøttende data. Liste over riger , opret mapper til at kopiere filer.
  • Filer – Forespørg riger for at kontrollere, om der er nye filer, der skal behandles.
  • Process – Downloader filen, behandler dataene, udtrækker metrics.

Koden

Back-end – Java EE 7 med Java 8

Det meste af koden vil være i back-end. Vi har brug for Batch JSR-352, men vi kommer også til at bruge en masse andre teknologier fra Java EE:som JPA, JAX-RS, CDI og JSON-P.

Siden Forbered Jobbet er kun at initialisere ansøgningsressourcer til behandlingen, jeg springer det over og dykker ned i de mest interessante dele.

Filer job

Files Job er en implementering af AbstractBatchlet . En batchlet er den enkleste behandlingsstil, der er tilgængelig i batchspecifikationen. Det er et opgaveorienteret trin, hvor opgaven påkaldes én gang, udføres og returnerer en exit-status. Denne type er mest nyttig til at udføre en række opgaver, der ikke er element-orienterede, såsom at udføre en kommando eller udføre filoverførsel. I dette tilfælde er vores Batchlet kommer til at iterere på hvert område lav en REST-anmodning til hver enkelt og hent en URL med filen, der indeholder de data, som vi ønsker at behandle. Her er koden:

LoadAuctionFilesBatchlet

@Named
public class LoadAuctionFilesBatchlet extends AbstractBatchlet {
    @Inject
    private WoWBusiness woWBusiness;

    @Inject
    @BatchProperty(name = "region")
    private String region;
    @Inject
    @BatchProperty(name = "target")
    private String target;

    @Override
    public String process() throws Exception {
        List<Realm> realmsByRegion = woWBusiness.findRealmsByRegion(Realm.Region.valueOf(region));
        realmsByRegion.parallelStream().forEach(this::getRealmAuctionFileInformation);

        return "COMPLETED";
    }

    void getRealmAuctionFileInformation(Realm realm) {
        try {
            Client client = ClientBuilder.newClient();
            Files files = client.target(target + realm.getSlug())
                                .request(MediaType.TEXT_PLAIN).async()
                                .get(Files.class)
                                .get(2, TimeUnit.SECONDS);

            files.getFiles().forEach(auctionFile -> createAuctionFile(realm, auctionFile));
        } catch (Exception e) {
            getLogger(this.getClass().getName()).log(Level.INFO, "Could not get files for " + realm.getRealmDetail());
        }
    }

    void createAuctionFile(Realm realm, AuctionFile auctionFile) {
        auctionFile.setRealm(realm);
        auctionFile.setFileName("auctions." + auctionFile.getLastModified() + ".json");
        auctionFile.setFileStatus(FileStatus.LOADED);

        if (!woWBusiness.checkIfAuctionFileExists(auctionFile)) {
            woWBusiness.createAuctionFile(auctionFile);
        }
    }
}

En fed ting ved dette er brugen af ​​Java 8. Med parallelStream() at påkalde flere REST-anmodninger på én gang er let som en kage! Du kan virkelig mærke forskellen. Hvis du vil prøve det, skal du bare køre prøven og erstatte parallelStream() med stream() og tjek det ud. På min maskine ved hjælp af parallelStream() får opgaven til at udføre omkring 5 eller 6 gange hurtigere.

Opdater
Normalt ville jeg ikke bruge denne tilgang. Jeg har gjort det, fordi en del af logikken involverer at påkalde langsomme REST-anmodninger og parallelStreams virkelig skinner her. At gøre dette ved hjælp af batch-partitioner er muligt, men svært at implementere. Vi skal også samle serverne for nye data hver gang, så det er ikke forfærdeligt, hvis vi springer en fil eller to over. Husk, at hvis du ikke vil gå glip af en enkelt plade, er en Chunk-behandlingsstil mere egnet. Tak til Simon Martinelli for at gøre mig opmærksom på dette.

Siden rigerne fra USA og EU kræver forskellige REST-endepunkter at påberåbe, disse er perfekte til at partitionere. Partitionering betyder, at opgaven kommer til at løbe ind i flere tråde. En tråd pr. skillevæg. I dette tilfælde har vi to partitioner.

For at fuldføre jobdefinitionen skal vi levere en JoB XML-fil. Dette skal placeres i META-INF/batch-jobs vejviser. Her er files-job.xml til dette job:

files-job.xml

<job id="loadRealmAuctionFileJob" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
    <step id="loadRealmAuctionFileStep">
        <batchlet ref="loadAuctionFilesBatchlet">
            <properties>
                <property name="region" value="#{partitionPlan['region']}"/>
                <property name="target" value="#{partitionPlan['target']}"/>
            </properties>
        </batchlet>
        <partition>
            <plan partitions="2">
                <properties partition="0">
                    <property name="region" value="US"/>
                    <property name="target" value="http://us.battle.net/api/wow/auction/data/"/>
                </properties>
                <properties partition="1">
                    <property name="region" value="EU"/>
                    <property name="target" value="http://eu.battle.net/api/wow/auction/data/"/>
                </properties>
            </plan>
        </partition>
    </step>
</job>

I files-job.xml vi skal definere vores Batchlet i batchlet element. For partitionerne skal du blot definere partition element og tildel forskellige properties til hver plan . Disse properties kan derefter bruges til sent at binde værdien til LoadAuctionFilesBatchlet med udtrykkene #{partitionPlan['region']} og #{partitionPlan['target']} . Dette er en meget enkel udtryksbindingsmekanisme og virker kun for simple egenskaber og strenge.

Behandle job

Nu vil vi behandle Realm Auction Data fil. Ved at bruge oplysningerne fra det tidligere job kan vi nu downloade filen og gøre noget med dataene. JSON-filen har følgende struktur:

item-auctions-sample.json

{
    "realm": {
        "name": "Grim Batol",
        "slug": "grim-batol"
    },
    "alliance": {
        "auctions": [
            {
                "auc": 279573567,            // Auction Id
                "item": 22792,               // Item for sale Id
                "owner": "Miljanko",         // Seller Name
                "ownerRealm": "GrimBatol",   // Realm
                "bid": 3800000,              // Bid Value
                "buyout": 4000000,           // Buyout Value
                "quantity": 20,              // Numbers of items in the Auction
                "timeLeft": "LONG",          // Time left for the Auction
                "rand": 0,
                "seed": 1069994368
            },
            {
                "auc": 278907544,
                "item": 40195,
                "owner": "Mongobank",
                "ownerRealm": "GrimBatol",
                "bid": 38000,
                "buyout": 40000,
                "quantity": 1,
                "timeLeft": "VERY_LONG",
                "rand": 0,
                "seed": 1978036736
            }
        ]
    },
    "horde": {
        "auctions": [
            {
                "auc": 278268046,
                "item": 4306,
                "owner": "Thuglifer",
                "ownerRealm": "GrimBatol",
                "bid": 570000,
                "buyout": 600000,
                "quantity": 20,
                "timeLeft": "VERY_LONG",
                "rand": 0,
                "seed": 1757531904
            },
            {
                "auc": 278698948,
                "item": 4340,
                "owner": "Celticpala",
                "ownerRealm": "Aggra(Português)",
                "bid": 1000000,
                "buyout": 1000000,
                "quantity": 10,
                "timeLeft": "LONG",
                "rand": 0,
                "seed": 0
            }
        ]
    }
}

Filen har en liste over Auktionens fra Riget den blev downloadet fra. I hver post kan vi tjekke varen til salg, priser, sælger og tid tilbage til auktionens afslutning. Auktioner er algo aggregeret af Auktionshuset type:Alliance og Horde .

For process-job vi ønsker at læse JSON-filen, transformere dataene og gemme dem i en database. Dette kan opnås ved Chunk Processing. En Chunk er en ETL-behandling (Extract – Transform – Load), som er velegnet til at håndtere store mængder data. En Chunk læser dataene ét element ad gangen og opretter chunks, der vil blive skrevet ud i en transaktion. Et element læses ind fra en ItemReader , afleveret til en ItemProcessor , og aggregeret. Når antallet af læste elementer er lig med commit-intervallet, skrives hele klumpen ud via ItemWriter , og så er transaktionen forpligtet.

ItemReader

De rigtige filer er så store, at de ikke kan indlæses helt i hukommelsen, eller du kan ende med at løbe tør for dem. I stedet bruger vi JSON-P API til at parse dataene på en streaming måde.

AuctionDataItemReader

@Named
public class AuctionDataItemReader extends AbstractAuctionFileProcess implements ItemReader {
    private JsonParser parser;
    private AuctionHouse auctionHouse;

    @Inject
    private JobContext jobContext;
    @Inject
    private WoWBusiness woWBusiness;

    @Override
    public void open(Serializable checkpoint) throws Exception {
        setParser(Json.createParser(openInputStream(getContext().getFileToProcess(FolderType.FI_TMP))));

        AuctionFile fileToProcess = getContext().getFileToProcess();
        fileToProcess.setFileStatus(FileStatus.PROCESSING);
        woWBusiness.updateAuctionFile(fileToProcess);
    }

    @Override
    public void close() throws Exception {
        AuctionFile fileToProcess = getContext().getFileToProcess();
        fileToProcess.setFileStatus(FileStatus.PROCESSED);
        woWBusiness.updateAuctionFile(fileToProcess);
    }

    @Override
    public Object readItem() throws Exception {
        while (parser.hasNext()) {
            JsonParser.Event event = parser.next();
            Auction auction = new Auction();
            switch (event) {
                case KEY_NAME:
                    updateAuctionHouseIfNeeded(auction);

                    if (readAuctionItem(auction)) {
                        return auction;
                    }
                    break;
            }
        }
        return null;
    }

    @Override
    public Serializable checkpointInfo() throws Exception {
        return null;
    }

    protected void updateAuctionHouseIfNeeded(Auction auction) {
        if (parser.getString().equalsIgnoreCase(AuctionHouse.ALLIANCE.toString())) {
            auctionHouse = AuctionHouse.ALLIANCE;
        } else if (parser.getString().equalsIgnoreCase(AuctionHouse.HORDE.toString())) {
            auctionHouse = AuctionHouse.HORDE;
        } else if (parser.getString().equalsIgnoreCase(AuctionHouse.NEUTRAL.toString())) {
            auctionHouse = AuctionHouse.NEUTRAL;
        }

        auction.setAuctionHouse(auctionHouse);
    }

    protected boolean readAuctionItem(Auction auction) {
        if (parser.getString().equalsIgnoreCase("auc")) {
            parser.next();
            auction.setAuctionId(parser.getLong());
            parser.next();
            parser.next();
            auction.setItemId(parser.getInt());
            parser.next();
            parser.next();
            parser.next();
            parser.next();
            auction.setOwnerRealm(parser.getString());
            parser.next();
            parser.next();
            auction.setBid(parser.getInt());
            parser.next();
            parser.next();
            auction.setBuyout(parser.getInt());
            parser.next();
            parser.next();
            auction.setQuantity(parser.getInt());
            return true;
        }
        return false;
    }

    public void setParser(JsonParser parser) {
        this.parser = parser;
    }
}

For at åbne en JSON Parse-stream skal vi bruge Json.createParser og videregive en reference for en inputstream. For at læse elementer skal vi blot kalde hasNext() og next() metoder. Dette returnerer en JsonParser.Event der giver os mulighed for at kontrollere positionen af ​​parseren i strømmen. Elementer læses og returneres i readItem() metode fra Batch API ItemReader . Når der ikke er flere elementer tilgængelige at læse, returneres null for at afslutte behandlingen. Bemærk, at vi også implementerer metoden open og close fra ItemReader . Disse bruges til at initialisere og rydde op i ressourcer. De udfører kun én gang.

ItemProcessor

ItemProcessor er valgfrit. Det bruges til at transformere de data, der blev læst. I dette tilfælde skal vi tilføje yderligere oplysninger til Auktionen .

AuctionDataItemProcessor

@Named
public class AuctionDataItemProcessor extends AbstractAuctionFileProcess implements ItemProcessor {
    @Override
    public Object processItem(Object item) throws Exception {
        Auction auction = (Auction) item;

        auction.setRealm(getContext().getRealm());
        auction.setAuctionFile(getContext().getFileToProcess());

        return auction;
    }
}

ItemWriter

Til sidst skal vi bare skrive dataene ned til en database:

AuctionDataItemWriter

@Named
public class AuctionDataItemWriter extends AbstractItemWriter {
    @PersistenceContext
    protected EntityManager em;

    @Override
    public void writeItems(List<Object> items) throws Exception {
        items.forEach(em::persist);
    }
}

Hele processen med en fil på 70 k rekord tager omkring 20 sekunder på min maskine. Jeg lagde mærke til noget meget interessant. Før denne kode brugte jeg en indsprøjtet EJB, der kaldte en metode med den vedvarende operation. Dette tog 30 sekunder i alt, så at injicere EntityManager og udføre persisten direkte sparede mig for en tredjedel af behandlingstiden. Jeg kan kun spekulere i, at forsinkelsen skyldes en stigning i stackkaldet, med EJB interceptorer i midten. Dette foregik i Wildfly. Jeg vil undersøge dette nærmere.

For at definere klumpen skal vi tilføje den til en process-job.xml-fil:

proces-job.xml

<step id="processFile" next="moveFileToProcessed">
    <chunk item-count="100">
        <reader ref="auctionDataItemReader"/>
        <processor ref="auctionDataItemProcessor"/>
        <writer ref="auctionDataItemWriter"/>
    </chunk>
</step>

I item-count egenskab vi definerer, hvor mange elementer der passer ind i hver del af behandlingen. Det betyder, at for hver 100 transaktionen er forpligtet. Dette er nyttigt for at holde transaktionsstørrelsen lav og for at kontrollere dataene. Hvis vi har brug for at stoppe og derefter genstarte operationen, kan vi gøre det uden at skulle behandle hvert element igen. Den logik skal vi selv kode. Dette er ikke inkluderet i prøven, men jeg vil gøre det i fremtiden.

Kører

For at køre et job skal vi have en reference til en JobOperator . JobOperator giver en grænseflade til at styre alle aspekter af jobbehandling, inklusive operationelle kommandoer, såsom start, genstart og stop, såvel som jobopbevaringsrelaterede kommandoer, såsom hentning af job- og trinudførelser.

For at køre den forrige files-job.xml Opgave vi udfører:

Udfør job

JobOperator jobOperator = BatchRuntime.getJobOperator();
jobOperator.start("files-job", new Properties());

Bemærk, at vi bruger navnet på job-xml-filen uden udvidelsen til JobOperator .

Næste trin

Vi mangler stadig at samle dataene for at udtrække metrics og vise dem på en webside. Dette indlæg er allerede langt, så jeg vil beskrive de følgende trin i et fremtidigt indlæg. Under alle omstændigheder er koden for den del allerede i Github-repoen. Tjek afsnittet Ressourcer.

Ressourcer

Du kan klone en fuld arbejdskopi fra mit github-lager og implementere det til Wildfly. Du kan finde instruktioner der til at implementere det.

Java tag