Java >> Programma Java >  >> Java

Lavora con flussi di database paralleli utilizzando pool di thread personalizzati

Stream di database paralleli

Nel mio post precedente, ho scritto dell'elaborazione dei contenuti del database in parallelo utilizzando flussi paralleli e Speedment. I flussi paralleli possono, in molte circostanze, essere significativamente più veloci dei normali flussi di database sequenziali.






Il pool di thread


Speedment è un toolkit Java Stream ORM open source e uno strumento Java runtime che racchiude un database esistente e le relative tabelle in flussi Java 8. Possiamo utilizzare un database esistente ed eseguire lo strumento Speedment e genererà classi POJO che corrispondono alle tabelle che abbiamo selezionato utilizzando lo strumento. Una caratteristica distintiva di Speedment è che supporta flussi di database paralleli e che può utilizzare diverse strategie parallele per ottimizzare ulteriormente le prestazioni. Per impostazione predefinita, i flussi paralleli vengono eseguiti sul comune  ForkJoinPool dove potenzialmente potrebbero competere con altri compiti. In questo post impareremo come eseguire flussi di database paralleli sul nostro personalizzato
ForkJoinPool, consentendo un controllo molto migliore del nostro ambiente di esecuzione.

Iniziare con Speedment

Vai a  open source Speedment su GitHub e scopri come iniziare con un progetto Speedment. Collegare lo strumento a un database esistente è davvero semplice. Leggi il mio
post precedente per ulteriori informazioni sull'aspetto della tabella del database e della classe PrimeUtil per gli esempi seguenti.

Esecuzione sul ForkJoinPool predefinito

Ecco l'applicazione di cui ho parlato nel mio precedente post che eseguirà la scansione di una tabella di database in parallelo per i candidati numeri primi indeterminati e quindi determinerà se sono primi o meno e aggiornerà la tabella di conseguenza. Ecco come appare:

Manager<PrimeCandidate> candidatesHigh = app.configure(PrimeCandidateManager.class)
            .withParallelStrategy(ParallelStrategy.computeIntensityHigh())
            .build();

        candidatesHigh.stream() 
            .parallel()                                                // Use a parallel stream
            .filter(PrimeCandidate.PRIME.isNull())                     // Only consider nondetermined prime candidates
            .map(pc -> pc.setPrime(PrimeUtil.isPrime(pc.getValue())))  // Sets if it is a prime or not
            .forEach(candidatesHigh.updater());                        // Apply the Manager's updater

Innanzitutto, creiamo uno stream su tutti i candidati (usando una strategia parallela denominata ParallelStrategy.computeIntensityHigh()) in cui la colonna "prime" è  null utilizzando il stream().filter(PrimeCandidate.PRIME.isNull()) metodo. Quindi, per ogni PC candidato principale, impostiamo la colonna "prime" su  true se  pc.getValue() è un numero primo o  false se  pc.getValue() non è un primo. È interessante notare che il  pc.setPrime() restituisce l'entità pc stesso, consentendoci di taggare facilmente più operazioni di flusso. Nell'ultima riga aggiorniamo il database con l'esito del nostro controllo applicando il  candidatesHigh.updater() funzione.

Ancora una volta, assicurati di controllare il mio post precedente sui dettagli e i vantaggi delle strategie parallele. In breve, la strategia parallela predefinita di Java funziona bene per basse richieste di calcolo perché pone una grande quantità di elementi di lavoro iniziali su ciascun thread. Le strategie parallele di Speedment funzionano molto meglio per richieste di calcolo medio-alte, in cui una piccola quantità di elementi di lavoro è disposta sui thread partecipanti.

Lo stream determinerà i numeri primi completamente paralleli e i thread di esecuzione utilizzeranno il comune   ForkJoinPool come si può vedere in questa immagine (il mio laptop ha 4 core CPU e 8 thread CPU):

Utilizza un servizio di esecuzione personalizzato

Come abbiamo appreso all'inizio di questo post, i flussi paralleli vengono eseguiti dal comune
ForkJoinPool per impostazione predefinita. Ma a volte vogliamo usare il nostro Esecutore, forse perché abbiamo paura di inondare il comune
ForkJoinPool , in modo che altre attività non possano essere eseguite correttamente. La definizione del nostro esecutore può essere eseguita facilmente per Speedment (e altre librerie di flussi) in questo modo:

final ForkJoinPool forkJoinPool = new ForkJoinPool(3);
    forkJoinPool.submit(() -> 
        
        candidatesHigh.stream() 
            .parallel()
            .filter(PrimeCandidate.PRIME.isNull())
            .map(pc -> pc.setPrime(PrimeUtil.isPrime(pc.getValue())))
            .forEach(candidatesHigh.updater()); 
            
    );

    try {
        forkJoinPool.shutdown();
        forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
    } catch (InterruptedException ie) {
        ie.printStackTrace();
    }

Il codice dell'applicazione non è modificato, ma è racchiuso in un  ForkJoinPool personalizzato che possiamo controllarci. Nell'esempio sopra, impostiamo un pool di thread con solo tre thread di lavoro. I thread di lavoro non sono condivisi con i thread nel comune  ForkJoinPool .

Ecco come appaiono i thread usando il servizio di esecuzione personalizzato:

In questo modo possiamo controllare sia il  ThreadPool effettivo stesso e precisamente come gli elementi di lavoro sono disposti in quel pool usando una strategia parallela!

Mantieni il calore nelle tue piscine!

Etichetta Java