Java >> Java tutorial >  >> Tag >> java.util

Oversigt over java.util.concurrent

1. Oversigt

java.util.concurrent pakken indeholder værktøjer til at oprette samtidige applikationer.

I denne artikel vil vi lave et overblik over hele pakken.

2. Hovedkomponenter

java.util.concurrent indeholder alt for mange funktioner til at diskutere i en enkelt skrivning. I denne artikel vil vi hovedsageligt fokusere på nogle af de mest nyttige værktøjer fra denne pakke som:

  • Udfører
  • ExecutorService
  • ScheduledExecutorService
  • Fremtid
  • CountDownLatch
  • CyclicBarrier
  • Semafor
  • ThreadFactory
  • Blokeringskø
  • DelayQueue
  • Låse
  • Phaser

Du kan også finde mange dedikerede artikler til individuelle klasser her.

2.1. Udfører

Udfører er en grænseflade, der repræsenterer et objekt, der udfører angivne opgaver.

Det afhænger af den konkrete implementering (hvorfra invokationen initieres), om opgaven skal køres på en ny eller aktuel tråd. Ved at bruge denne grænseflade kan vi derfor afkoble opgaveudførelsesstrømmen fra den faktiske opgaveudførelsesmekanisme.

Et punkt at bemærke her er, at Executor kræver strengt taget ikke, at opgaveudførelsen er asynkron. I det enkleste tilfælde kan en eksekutør påkalde den indsendte opgave øjeblikkeligt i den påkaldende tråd.

Vi skal oprette en invoker for at oprette eksekveringsinstansen:

public class Invoker implements Executor {
    @Override
    public void execute(Runnable r) {
        r.run();
    }
}

Nu kan vi bruge denne invoker til at udføre opgaven.

public void execute() {
    Executor executor = new Invoker();
    executor.execute( () -> {
        // task to be performed
    });
}

Punkt at bemærke her er, at hvis udføreren ikke kan acceptere opgaven til udførelse, vil den kaste RejectedExecutionException .

2.2. ExecutorService

ExecutorService er en komplet løsning til asynkron behandling. Den administrerer en kø i hukommelsen og planlægger indsendte opgaver baseret på trådtilgængelighed.

For at bruge ExecutorService, vi skal oprette en Kørbar klasse.

public class Task implements Runnable {
    @Override
    public void run() {
        // task details
    }
}

Nu kan vi oprette ExecutorService instans og tildel denne opgave. På oprettelsestidspunktet skal vi angive trådpoolstørrelsen.

ExecutorService executor = Executors.newFixedThreadPool(10);

Hvis vi ønsker at oprette en enkelt-trådet ExecutorService for eksempel kan vi bruge newSingleThreadExecutor(ThreadFactory threadFactory) for at oprette instansen.

Når udføreren er oprettet, kan vi bruge den til at sende opgaven.

public void execute() { 
    executor.submit(new Task()); 
}

Vi kan også oprette Runnable forekomst, mens du sender opgaven.

executor.submit(() -> {
    new Task();
});

Den leveres også med to out-of-the-box afslutningsmetoder for eksekvering. Den første er shutdown(); den venter, indtil alle de indsendte opgaver er færdige med at blive udført. Den anden metode er shutdownNow() som forsøger at afslutte alle aktivt udførende opgaver og stopper behandlingen af ​​ventende opgaver.

Der er også en anden metode awaitTermination(lang timeout, TimeUnit unit) som blokerer kraftigt, indtil alle opgaver har fuldført eksekveringen efter en nedlukningshændelse udløst eller eksekveringstimeout opstod, eller selve udførelsestråden er afbrudt,

try {
    executor.awaitTermination( 20l, TimeUnit.NANOSECONDS );
} catch (InterruptedException e) {
    e.printStackTrace();
}

2.3. ScheduledExecutorService

ScheduledExecutorService er en lignende grænseflade til ExecutorService, men den kan udføre opgaver med jævne mellemrum.

Executor and ExecutorService ’s metoder er planlagt på stedet uden at indføre nogen kunstig forsinkelse. Nul eller en negativ værdi betyder, at anmodningen skal udføres øjeblikkeligt.

Vi kan bruge begge Runnable og Kan opkaldes grænseflade til at definere opgaven.

public void execute() {
    ScheduledExecutorService executorService
      = Executors.newSingleThreadScheduledExecutor();

    Future<String> future = executorService.schedule(() -> {
        // ...
        return "Hello world";
    }, 1, TimeUnit.SECONDS);

    ScheduledFuture<?> scheduledFuture = executorService.schedule(() -> {
        // ...
    }, 1, TimeUnit.SECONDS);

    executorService.shutdown();
}

ScheduledExecutorService kan også planlægge opgaven efter en given fast forsinkelse :

executorService.scheduleAtFixedRate(() -> {
    // ...
}, 1, 10, TimeUnit.SECONDS);

executorService.scheduleWithFixedDelay(() -> {
    // ...
}, 1, 10, TimeUnit.SECONDS);

Her er scheduleAtFixedRate( Runnable command, long initialDelay, long period, TimeUnit unit ) metoden opretter og udfører en periodisk handling, der påkaldes først efter den angivne indledende forsinkelse, og efterfølgende med den givne periode, indtil serviceinstansen lukker ned.

scheduleWithFixedDelay( Kørbar kommando, long initialDelay, long delay, TimeUnit unit ) metoden opretter og udfører en periodisk handling, der påkaldes først efter den angivne indledende forsinkelse, og gentagne gange med den givne forsinkelse mellem afslutningen af ​​den udførende og påkaldelsen af ​​den næste.

2.4. Fremtid

Fremtid bruges til at repræsentere resultatet af en asynkron operation. Det leveres med metoder til at kontrollere, om den asynkrone operation er fuldført eller ej, få det beregnede resultat osv.

Hvad mere er, cancel(boolean mayInterruptIfRunning) API annullerer handlingen og frigiver den eksekverende tråd. Hvis værdien af ​​mayInterruptIfRunning er sandt, vil tråden, der udfører opgaven, blive afsluttet øjeblikkeligt.

Ellers vil igangværende opgaver få lov til at fuldføre.

Vi kan bruge nedenstående kodestykke til at oprette en fremtidig instans:

public void invoke() {
    ExecutorService executorService = Executors.newFixedThreadPool(10);

    Future<String> future = executorService.submit(() -> {
        // ...
        Thread.sleep(10000l);
        return "Hello world";
    });
}

Vi kan bruge følgende kodestykke til at kontrollere, om det fremtidige resultat er klar, og hente dataene, hvis beregningen er udført:

if (future.isDone() && !future.isCancelled()) {
    try {
        str = future.get();
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}

Vi kan også angive en timeout for en given operation. Hvis opgaven tager mere end denne tid, en TimeoutException er kastet:

try {
    future.get(10, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
    e.printStackTrace();
}

2.5. CountDownLatch

CountDownLatch (introduceret i JDK 5 ) er en hjælpeklasse, som blokerer et sæt tråde, indtil en eller anden handling er fuldført.

En CountDownLatch initialiseres med en tæller(heltal type); denne tæller falder, efterhånden som de afhængige tråde fuldfører eksekveringen. Men når tælleren når nul, bliver andre tråde frigivet.

Du kan lære mere om CountDownLatch her.

2.6. CyclicBarrier

CyclicBarrier fungerer næsten det samme som CountDownLatch bortset fra at vi kan genbruge det. I modsætning til CountDownLatch , det tillader flere tråde at vente på hinanden ved hjælp af await() metode (kendt som barrieretilstand) før den endelige opgave startes.

Vi skal oprette en Runnable opgaveinstans for at starte barrieretilstanden:

public class Task implements Runnable {

    private CyclicBarrier barrier;

    public Task(CyclicBarrier barrier) {
        this.barrier = barrier;
    }

    @Override
    public void run() {
        try {
            LOG.info(Thread.currentThread().getName() + 
              " is waiting");
            barrier.await();
            LOG.info(Thread.currentThread().getName() + 
              " is released");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }

}

Nu kan vi påkalde nogle tråde for at race for barrieretilstanden:

public void start() {

    CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
        // ...
        LOG.info("All previous tasks are completed");
    });

    Thread t1 = new Thread(new Task(cyclicBarrier), "T1"); 
    Thread t2 = new Thread(new Task(cyclicBarrier), "T2"); 
    Thread t3 = new Thread(new Task(cyclicBarrier), "T3"); 

    if (!cyclicBarrier.isBroken()) { 
        t1.start(); 
        t2.start(); 
        t3.start(); 
    }
}

Her er isBroken() metoden kontrollerer, om nogen af ​​trådene blev afbrudt i løbet af udførelsestiden. Vi bør altid udføre denne kontrol, før vi udfører den faktiske proces.

2.7. Semaphore

Semaforen bruges til at blokere trådniveauadgang til en del af den fysiske eller logiske ressource. En semafor indeholder et sæt tilladelser; hver gang en tråd forsøger at komme ind i den kritiske sektion, skal den tjekke semaforen, om en tilladelse er tilgængelig eller ej.

Hvis en tilladelse ikke er tilgængelig (via tryAcquire() ), tråden må ikke hoppe ind i det kritiske afsnit; Men hvis tilladelsen er tilgængelig, gives adgangen, og tilladelsestælleren falder.

Når den eksekverende tråd frigiver den kritiske sektion, øges tilladelsestælleren igen (gjort af release() metode).

Vi kan angive en timeout for at opnå adgang ved at bruge tryAcquire(lang timeout, TimeUnit unit) metode.

Vi kan også kontrollere antallet af tilgængelige tilladelser eller antallet af tråde, der venter på at erhverve semaforen.

Følgende kodestykke kan bruges til at implementere en semafor:

static Semaphore semaphore = new Semaphore(10);

public void execute() throws InterruptedException {

    LOG.info("Available permit : " + semaphore.availablePermits());
    LOG.info("Number of threads waiting to acquire: " + 
      semaphore.getQueueLength());

    if (semaphore.tryAcquire()) {
        try {
            // ...
        }
        finally {
            semaphore.release();
        }
    }

}

Vi kan implementere en Mutex som datastruktur ved hjælp af Semaphore . Flere detaljer om dette kan findes her.

2.8. ThreadFactory

Som navnet antyder, ThreadFactory fungerer som en tråd (ikke-eksisterende) pulje, som opretter en ny tråd efter behov. Det eliminerer behovet for en masse kedelkodning for at implementere effektive trådskabelsesmekanismer.

Vi kan definere en ThreadFactory :

public class BaeldungThreadFactory implements ThreadFactory {
    private int threadId;
    private String name;

    public BaeldungThreadFactory(String name) {
        threadId = 1;
        this.name = name;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r, name + "-Thread_" + threadId);
        LOG.info("created new thread with id : " + threadId +
            " and name : " + t.getName());
        threadId++;
        return t;
    }
}

Vi kan bruge denne newThread(Runnable r) metode til at oprette en ny tråd under kørsel:

BaeldungThreadFactory factory = new BaeldungThreadFactory( 
    "BaeldungThreadFactory");
for (int i = 0; i < 10; i++) { 
    Thread t = factory.newThread(new Task());
    t.start(); 
}

2.9. Blokeringskø

I asynkron programmering er et af de mest almindelige integrationsmønstre producent-forbruger-mønsteret. java.util.concurrent pakken leveres med en datastruktur kendt som BlockingQueue – hvilket kan være meget nyttigt i disse async-scenarier.

Mere information og et fungerende eksempel på dette er tilgængeligt her.

2.10. DelayQueue

DelayQueue er en uendelig størrelse blokerende kø af elementer, hvor et element kun kan trækkes, hvis dets udløbstid (kendt som brugerdefineret forsinkelse) er fuldført. Derfor er det øverste element (hoved ) vil have den største forsinkelse, og den vil blive pollet sidst.

Mere information og et fungerende eksempel på dette er tilgængeligt her.

2.11. Låse

Ikke overraskende Lås er et værktøj til at blokere andre tråde fra at få adgang til et bestemt kodesegment, bortset fra den tråd, der udfører det i øjeblikket.

Hovedforskellen mellem en lås og en synkroniseret blok er, at synkroniseret blok er fuldt ud indeholdt i en metode; dog kan vi have Lock API's lock() og unlock() operation i separate metoder.

Mere information og et fungerende eksempel på dette er tilgængeligt her.

2.12. Phaser

Phaser er en mere fleksibel løsning end CyclicBarrier og CountDownLatch – bruges til at fungere som en genanvendelig barriere, hvor det dynamiske antal tråde skal vente, før de fortsætter udførelse. Vi kan koordinere flere faser af udførelse ved at genbruge en Phaser instans for hver programfase.

Mere information og et fungerende eksempel på dette er tilgængeligt her.

3. Konklusion

I denne oversigtsartikel på højt niveau har vi fokuseret på de forskellige tilgængelige værktøjer i java.util.concurrent pakke.

Som altid er den fulde kildekode tilgængelig på GitHub.


Java tag