Java >> Tutoriel Java >  >> Tag >> java.util

Consommer java.util.concurrent.BlockingQueue en tant que rx.Observable

Le modèle classique producteur-consommateur est relativement simple en Java puisque nous avons java.util.concurrent.BlockingQueue . Pour éviter une attente intense et un verrouillage manuel sujet aux erreurs, nous profitons simplement de put() et take() . Ils bloquent tous les deux si la file d'attente est pleine ou vide respectivement. Tout ce dont nous avons besoin, c'est d'un tas de threads partageant une référence à la même file d'attente :certains produisent et d'autres consomment. Et bien sûr, la file d'attente doit avoir une capacité limitée, sinon nous manquerons bientôt de mémoire si les producteurs surpassent les consommateurs. Greg Young n'a pas insisté assez sur cette règle lors de Devoxx Poland :

Ne créez jamais, au grand jamais, une file d'attente illimitée

Producteur-consommateur utilisant BlockingQueue

Voici un exemple le plus simple. Nous avons d'abord besoin d'un producteur qui place les objets dans une file d'attente partagée :

import lombok.Value;
import lombok.extern.slf4j.Slf4j;
 
@Slf4j
@Value
class Producer implements Runnable {
 
  private final BlockingQueue<User> queue;
 
  @Override
  public void run() {
    try {
      while (!Thread.currentThread().isInterrupted()) {
        final User user = new User("User " + System.currentTimeMillis());
        log.info("Producing {}", user);
        queue.put(user);
        TimeUnit.SECONDS.sleep(1);
      }
    } catch (Exception e) {
      log.error("Interrupted", e);
    }
  }
}

Producer publie simplement une instance de User classe (quelle qu'elle soit) à une file d'attente donnée chaque seconde. Évidemment dans la vraie vie en plaçant User dans une file d'attente serait le résultat d'une action au sein d'un système, comme la connexion d'un utilisateur. De même, le consommateur prend de nouveaux éléments dans une file d'attente et les traite :

@Slf4j
@Value
class Consumer implements Runnable {
 
  private final BlockingQueue<User> queue;
 
  @Override
  public void run() {
    try {
      while (!Thread.currentThread().isInterrupted()) {
        final User user = queue.take();
        log.info("Consuming: {}", user);
      }
    } catch (Exception e) {
      log.error("Interrupted", e);
    }
  }
}

Encore une fois, dans la vie réelle, le traitement signifierait stocker dans une base de données ou exécuter une détection de fraude sur un utilisateur. Nous utilisons la file d'attente pour dissocier le thread de traitement du thread consommateur, par ex. pour réduire la latence. Pour exécuter un test simple, lançons quelques threads producteurs et consommateurs :

BlockingQueue<User> queue = new ArrayBlockingQueue<>(1_000);
final List<Runnable> runnables = Arrays.asList(
    new Producer(queue),
    new Producer(queue),
    new Consumer(queue),
    new Consumer(queue),
    new Consumer(queue)
);
 
final List<Thread> threads = runnables
    .stream()
    .map(runnable -> new Thread(runnable, threadName(runnable)))
    .peek(Thread::start)
    .collect(toList());
 
TimeUnit.SECONDS.sleep(5);
threads.forEach(Thread::interrupt);
 
//...
 
private static String threadName(Runnable runnable) {
  return runnable.getClass().getSimpleName() + "-" + System.identityHashCode(runnable);
}

Nous avons 2 producteurs et 3 consommateurs, tout semble fonctionner. Dans la vraie vie, vous auriez probablement des threads producteurs implicites, comme les threads de traitement des requêtes HTTP. Du côté du consommateur, vous utiliserez très probablement un pool de threads. Ce schéma fonctionne bien, mais surtout le côté consommation est assez bas.

Présentation de ObservableQueue<T>

Le but de cet article est d'introduire une abstraction qui se comporte comme une file d'attente côté producteur mais comme un Observable de RxJava côté consommateur. En d'autres termes, nous pouvons traiter les objets ajoutés à une file d'attente comme un flux que nous pouvons mapper, filtrer, composer, etc. côté client. Fait intéressant, ce n'est plus une file d'attente sous le capot. ObservableQueue<T> transmet simplement tous les nouveaux objets directement aux consommateurs abonnés et ne met pas les événements en mémoire tampon au cas où personne n'écoute ("hot » observable). ObservableQueue<T> n'est pas une file d'attente en soi, c'est juste un pont entre une API et l'autre. C'est similaire à java.util.concurrent.SynchronousQueue , mais si personne n'est intéressé à consommer, l'objet est simplement jeté.

Voici une première implémentation expérimentale. C'est juste un code de jouet, ne le considérez pas comme prêt pour la production. De plus, nous le simplifierons considérablement plus tard :

public class ObservableQueue<T> implements BlockingQueue<T>, Closeable {
 
  private final Set<Subscriber<? super T>> subscribers = Collections.newSetFromMap(new ConcurrentHashMap<>());
  private final Observable<T> observable = Observable.create(subscriber -> {
    subscriber.add(new Subscription() {
      @Override
      public void unsubscribe() {
        subscribers.remove(subscriber);
      }
 
      @Override
      public boolean isUnsubscribed() {
        return false;
      }
    });
    subscribers.add(subscriber);
  });
 
  public Observable<T> observe() {
    return observable;
  }
 
  @Override
  public boolean add(T t) {
    return offer(t);
  }
 
  @Override
  public boolean offer(T t) {
    subscribers.forEach(subscriber -> subscriber.onNext(t));
    return true;
  }
 
  @Override
  public T remove() {
    return noSuchElement();
  }
 
  @Override
  public T poll() {
    return null;
  }
 
  @Override
  public T element() {
    return noSuchElement();
  }
 
  private T noSuchElement() {
    throw new NoSuchElementException();
  }
 
  @Override
  public T peek() {
    return null;
  }
 
  @Override
  public void put(T t) throws InterruptedException {
    offer(t);
  }
 
  @Override
  public boolean offer(T t, long timeout, TimeUnit unit) throws InterruptedException {
    return offer(t);
  }
 
  @Override
  public T take() throws InterruptedException {
    throw new UnsupportedOperationException("Use observe() instead");
  }
 
  @Override
  public T poll(long timeout, TimeUnit unit) throws InterruptedException {
    return null;
  }
 
  @Override
  public int remainingCapacity() {
    return 0;
  }
 
  @Override
  public boolean remove(Object o) {
    return false;
  }
 
  @Override
  public boolean containsAll(Collection<?> c) {
    return false;
  }
 
  @Override
  public boolean addAll(Collection<? extends T> c) {
    c.forEach(this::offer);
    return true;
  }
 
  @Override
  public boolean removeAll(Collection<?> c) {
    return false;
  }
 
  @Override
  public boolean retainAll(Collection<?> c) {
    return false;
  }
 
  @Override
  public void clear() {
  }
 
  @Override
  public int size() {
    return 0;
  }
 
  @Override
  public boolean isEmpty() {
    return true;
  }
 
  @Override
  public boolean contains(Object o) {
    return false;
  }
 
  @Override
  public Iterator<T> iterator() {
    return Collections.emptyIterator();
  }
 
  @Override
  public Object[] toArray() {
    return new Object[0];
  }
 
  @Override
  public <T> T[] toArray(T[] a) {
    return a;
  }
 
  @Override
  public int drainTo(Collection<? super T> c) {
    return 0;
  }
 
  @Override
  public int drainTo(Collection<? super T> c, int maxElements) {
    return 0;
  }
 
  @Override
  public void close() throws IOException {
    subscribers.forEach(rx.Observer::onCompleted);
  }
}

Il y a quelques faits intéressants à ce sujet :

  1. Nous devons suivre tous les abonnés, c'est-à-dire les consommateurs qui souhaitent recevoir de nouveaux articles. Si l'un des abonnés n'est plus intéressé, nous devons supprimer cet abonné, sinon une fuite de mémoire se produira (continuez à lire !)
  2. Cette file d'attente se comporte comme si elle était toujours vide. Il ne contient jamais d'éléments - lorsque vous mettez quelque chose dans cette file d'attente, il est automatiquement transmis aux abonnés et oublié
  3. Techniquement, cette file d'attente est illimitée (!), ce qui signifie que vous pouvez mettre autant d'éléments que vous le souhaitez. Cependant, comme les éléments sont transmis à tous les abonnés (le cas échéant) et immédiatement supprimés, cette file d'attente est en fait toujours vide (voir ci-dessus)
  4. Il est toujours possible que le producteur génère trop d'événements et que les consommateurs ne puissent pas suivre cela :RxJava dispose désormais d'un support de contre-pression, non couvert dans cet article.

Le producteur peut utiliser ObservableQueue<T> comme n'importe quel autre BlockingQueue<T> , en supposant que j'ai correctement implémenté le contrat de file d'attente. Cependant, le consommateur semble beaucoup plus léger et plus intelligent :

final ObservableQueue<User> users = new ObservableQueue<>();
final Observable<User> observable = users.observe();
 
users.offer(new User("A"));
observable.subscribe(user -> log.info("User logged in: {}", user));
users.offer(new User("B"));
users.offer(new User("C"));

Le code ci-dessus imprime "B" et "C" seulement. "A" est perdu à dessein depuis ObservableQueue laisse tomber des objets au cas où personne n'écoute. Évidemment Producer la classe utilise maintenant users file d'attente. Tout fonctionne bien, vous pouvez appeler le users.observe() à tout moment et appliquer l'un des dizaines de Observable les opérateurs. Cependant, il y a une mise en garde :par défaut, RxJava n'impose aucun thread, donc la consommation se produit dans le même thread que la production ! Nous avons perdu la caractéristique la plus importante du modèle producteur-consommateur, à savoir le découplage des fils. Heureusement, tout est déclaratif dans RxJava, la planification des threads également :

users
    .observe()
    .observeOn(Schedulers.computation())
    .forEach(user ->
            log.info("User logged in: {}", user)
    );

Voyons maintenant une réelle puissance de RxJava. Imaginez que vous vouliez compter le nombre d'utilisateurs qui se connectent par seconde, où chaque connexion est placée en tant qu'événement dans une file d'attente :

users
  .observe()
  .map(User::getName)
  .filter(name -> !name.isEmpty())
  .window(1, TimeUnit.SECONDS)
  .flatMap(Observable::count)
  .doOnCompleted(() -> log.info("System shuts down"))
  .forEach(c -> log.info("Logins in last second: {}", c));

Les performances sont également acceptables, une telle file d'attente peut accepter environ 3 millions d'objets par seconde sur mon ordinateur portable avec un abonné. Traitez cette classe comme un adaptateur des systèmes hérités utilisant des files d'attente vers le monde réactif moderne. Mais attendez! Utilisation de ObservableQueue<T> est facile, mais l'implémentation avec subscribers l'ensemble synchronisé semble trop bas niveau. Heureusement, il y a Subject<T, T> . Subject est "l'autre côté" de Observable – vous pouvez pousser les événements vers Subject mais il implémente toujours Observable , vous pouvez donc facilement créer des Observable arbitraires . Regardez comme c'est beau ObservableQueue ressemble à l'un des Subject implémentations :

public class ObservableQueue<T> implements BlockingQueue<T>, Closeable {
 
  private final Subject<T, T> subject = PublishSubject.create();
 
  public Observable<T> observe() {
    return subject;
  }
 
  @Override
  public boolean add(T t) {
    return offer(t);
  }
 
  @Override
  public boolean offer(T t) {
    subject.onNext(t);
    return true;
  }
 
  @Override
  public void close() throws IOException {
    subject.onCompleted();
  }
  @Override
  public T remove() {
    return noSuchElement();
  }
 
  @Override
  public T poll() {
    return null;
  }
 
  @Override
  public T element() {
    return noSuchElement();
  }
 
  private T noSuchElement() {
    throw new NoSuchElementException();
  }
 
  @Override
  public T peek() {
    return null;
  }
 
  @Override
  public void put(T t) throws InterruptedException {
    offer(t);
  }
 
  @Override
  public boolean offer(T t, long timeout, TimeUnit unit) throws InterruptedException {
    return offer(t);
  }
 
  @Override
  public T take() throws InterruptedException {
    throw new UnsupportedOperationException("Use observe() instead");
  }
 
  @Override
  public T poll(long timeout, TimeUnit unit) throws InterruptedException {
    return null;
  }
 
  @Override
  public int remainingCapacity() {
    return 0;
  }
 
  @Override
  public boolean remove(Object o) {
    return false;
  }
 
  @Override
  public boolean containsAll(Collection<?> c) {
    return false;
  }
 
  @Override
  public boolean addAll(Collection<? extends T> c) {
    c.forEach(this::offer);
    return true;
  }
 
  @Override
  public boolean removeAll(Collection<?> c) {
    return false;
  }
 
  @Override
  public boolean retainAll(Collection<?> c) {
    return false;
  }
 
  @Override
  public void clear() {
  }
 
  @Override
  public int size() {
    return 0;
  }
 
  @Override
  public boolean isEmpty() {
    return true;
  }
 
  @Override
  public boolean contains(Object o) {
    return false;
  }
 
  @Override
  public Iterator<T> iterator() {
    return Collections.emptyIterator();
  }
 
  @Override
  public Object[] toArray() {
    return new Object[0];
  }
 
  @Override
  public <T> T[] toArray(T[] a) {
    return a;
  }
 
  @Override
  public int drainTo(Collection<? super T> c) {
    return 0;
  }
 
  @Override
  public int drainTo(Collection<? super T> c, int maxElements) {
    return 0;
  }
 
}

L'implémentation ci-dessus est beaucoup plus propre et nous n'avons pas du tout à nous soucier de la synchronisation des threads.

Balise Java