Java >> Java Tutorial >  >> Tag >> Queue

Verwenden von Bull-Warteschlangen in der NestJS-Anwendung

In vielen Szenarien müssen Sie asynchrone CPU-intensive Aufgaben bewältigen. Insbesondere, wenn eine Anwendung Daten über die REST-API anfordert. Der REST-Endpunkt sollte innerhalb eines begrenzten Zeitrahmens antworten.

In diesem Beitrag werde ich zeigen, wie wir Warteschlangen verwenden können, um asynchrone Aufgaben zu bewältigen. Wir werden Bull-Warteschlangen in einer einfachen NestJS-Anwendung verwenden.

Warteschlangen sind eine Datenstruktur, die einer linearen Reihenfolge folgt. In den meisten Systemen verhalten sich Warteschlangen wie eine Reihe von Aufgaben. Ein Herausgeber veröffentlicht eine Nachricht oder Aufgabe in der Warteschlange. Ein Verbraucher nimmt diese Nachricht zur weiteren Verarbeitung auf. Dies kann asynchron geschehen und bietet die dringend benötigte Ruhepause für CPU-intensive Aufgaben. Sobald der Verbraucher die Nachricht konsumiert hat, steht die Nachricht keinem anderen Verbraucher zur Verfügung.

Bull-Warteschlangen basieren auf Redis. In meinem vorherigen Beitrag habe ich behandelt, wie man eine Zustandsprüfung für Redis oder eine Datenbank in einer NestJS-Anwendung hinzufügt.

  • NestJS-Anwendung einrichten
  • Bull-Warteschlangen in der NestJS-Anwendung
  • Implementieren eines Prozessors zum Verarbeiten von Warteschlangendaten
  • Integration von Bull Dashboard
    • Bull Board-Klasse hinzufügen
    • Controller hinzufügen
  • Schlussfolgerung

NestJS-Anwendung einrichten

Im Rahmen dieser Demo erstellen wir eine einfache Anwendung. Wir werden Benutzerdaten über eine CSV-Datei hochladen. Ein Controller akzeptiert diese Datei und übergibt sie an eine Warteschlange. Ein Prozessor übernimmt den Auftrag in der Warteschlange und verarbeitet die Datei, um Daten aus der CSV-Datei in der Datenbank zu speichern.

nest new bullqueuedemo

Sobald dieser Befehl den Ordner für bullqueuedemo erstellt , richten wir Prisma ORM ein, um eine Verbindung zur Datenbank herzustellen. (Hinweis – stellen Sie sicher, dass Sie Prisma-Abhängigkeiten installieren.).

npx prisma init

Wenn Sie einen Windows-Computer verwenden, tritt möglicherweise ein Fehler beim Ausführen von prisma init auf. Alles in allem sollten Sie eine Umgebungsvariable einrichten, um diesen Fehler zu vermeiden.

set PRISMA_CLI_QUERY_ENGINE_TYPE=binary

set PRISMA_CLIENT_ENGINE_TYPE=binary

Sobald das Schema erstellt ist, aktualisieren wir es mit unseren Datenbanktabellen. Für diese Demo erstellen wir eine einzelne Tabelle user .

// This is your Prisma schema file,
// learn more about it in the docs: https://pris.ly/d/prisma-schema

generator client {
  provider = "prisma-client-js"
  engineType = "binary"
}

datasource db {
  provider = "mysql"
  url      = env("DATABASE_URL")
}


model User {
  id    Int     @default(autoincrement()) @id
  email String  @unique
  first_name  String
  last_name   String?
}

Wenn wir jetzt npm run prisma migrate dev ausführen , wird eine Datenbanktabelle erstellt.

Zusammenfassend haben wir bisher eine NestJS-Anwendung erstellt und unsere Datenbank mit Prisma ORM eingerichtet. Schauen wir uns die Konfiguration an, die wir für Bull Queue hinzufügen müssen.

Bull-Warteschlangen in der NestJS-Anwendung

Installieren Sie @nestjs/bull Abhängigkeit. Diese Abhängigkeit kapselt die bull-Bibliothek. Wir gehen davon aus, dass Sie redis haben installiert und läuft. Standardmäßig wird Redis auf Port 6379 ausgeführt.

Wir werden REDIS_HOST hinzufügen und REDIS_PORT als Umgebungsvariablen in unserem .env Datei. Installieren Sie zwei Abhängigkeiten für Bull wie folgt:

npm install @nestjs/bull

npm install @types/bull

Anschließend richten wir die Verbindung mit Redis ein, indem wir BullModule hinzufügen zu unserem App-Modul.

@Module({
  imports: [
    BullModule.forRootAsync({
      imports: [ConfigModule],
      useFactory: async (configService: ConfigService) => ({
        redis: {
          host: configService.get('REDIS_HOST'),
          port: Number(configService.get('REDIS_PORT')),
        },
      }),
      inject: [ConfigService]
    }),
    BullModule.registerQueue({
      name: 'file-upload-queue'
    }), 
  ],
  controllers: [AppController, BullBoardController],
  providers: [UserService, PrismaService, FileUploadProcessor,],
})
export class AppModule {}

Wir injizieren ConfigService. Dieser Dienst ermöglicht es uns, Umgebungsvariablen zur Laufzeit abzurufen. Damit können wir BullModule verwenden in unserer Anwendung.

Wie Sie im obigen Code sehen können, haben wir BullModule.registerQueue und das registriert unsere Warteschlange file-upload-queue . Lassen Sie uns nun diese Warteschlange in unserem Controller hinzufügen, wo sie verwendet wird.

@Controller('/api/bullqueuedemo')
export class AppController {
  constructor(@InjectQueue('file-upload-queue') private fileQueue: Queue) {
    queuePool.add(fileQueue);
  }

  @Post('/uploadFile')
  @UseInterceptors(FileInterceptor("csv", {
    storage: diskStorage({
      destination: './csv',
      fileName: (req, file, cb) => {
        const randomName = Array(32).fill(null).map(() => (Math.round(Math.random() * cb(null, `${randomName}${extname(file.originalname)}`))))
      }
    })
  }))
  async uploadCsvFile(@UploadedFile() file): Promise {
    const job = await this.fileQueue.add('csvfilejob', {file: file});
    console.log(`created job ${ job.id}`);
  }

  @Get('/')
  async getHello(): Promise {
    return "Hello World";
  }
}

Lassen Sie uns diesen Code langsam durchgehen, um zu verstehen, was passiert.

  • Im Konstruktor fügen wir die Warteschlange InjectQueue('file-upload-queue') ein .
  • Unsere POST-API dient zum Hochladen einer CSV-Datei.
  • Wir verwenden einen FileInterceptor. Dies ist eine Funktion, die NestJS anbietet, um die Anfrage abzufangen und Dateien aus der Anfrage zu extrahieren. Dieser Interceptor nimmt zwei Argumente fieldName und options .
  • storage Option ermöglicht es uns, die hochgeladene Datei in einem Ordner namens csv zu speichern im aktuellen Ausführungsverzeichnis. Die hochgeladene Datei wird mit einem zufällig generierten Namen und der Erweiterung .csv umbenannt .
  • In der Methode uploadCsvFile , erhalten wir die hochgeladene Datei. Dies kommt von unserem FileInterceptor. Wir verwenden unsere injizierte Warteschlange, um einen Job mit dem Namen csvfilejob hinzuzufügen und Daten, die die Datei enthalten.

Implementieren eines Prozessors zum Verarbeiten von Warteschlangendaten

Danach haben wir unserer Warteschlange file-upload-queue einen Job hinzugefügt . Um diesen Job nun weiter zu verarbeiten, implementieren wir einen Prozessor FileUploadProcessor .

Wir werden diesen Verbraucher mit @Processor('file-upload-queue') annotieren .

@Processor('file-upload-queue')
export class FileUploadProcessor {

    constructor(private readonly userService: UserService){}

    @Process('csvfilejob')
    async processFile(job: Job) {
        const file = job.data.file;
        const filePath = file.path;
        const userData = await csv().fromFile(filePath);

        console.log(userData);

        for(const user of userData) {
            const input = {
                email: user.email,
                first_name: user.first_name,
                last_name: user.last_name,
            };
            const userCreated = await this.userService.createUser(input);
            console.log('User created -', userCreated.id );
        }

    }
    
}

In Kürze können wir sehen, dass wir den Job aus der Warteschlange verbrauchen und die Datei aus den Jobdaten abrufen. Beachten Sie, dass wir @Process(jobName) hinzufügen müssen zu der Methode, die den Job verbraucht. processFile Methode verbraucht den Job. Wir konvertieren CSV-Daten in JSON und verarbeiten dann jede Zeile, um einen Benutzer mit UserService. zu unserer Datenbank hinzuzufügen

Sobald Sie FileUploadProcessor erstellt haben , stellen Sie sicher, dass Sie diesen als Anbieter in Ihrem App-Modul registrieren.

Um dies zu zeigen, wenn ich die API über Postman ausführe, sehe ich die folgenden Daten in der Konsole:

[Nest] 21264  - 04/22/2022, 4:57:19 PM     LOG [NestFactory] Starting Nest application...
[Nest] 21264  - 04/22/2022, 4:57:20 PM     LOG [InstanceLoader] DiscoveryModule dependencies initialized +43ms
[Nest] 21264  - 04/22/2022, 4:57:20 PM     LOG [InstanceLoader] ConfigHostModule dependencies initialized +0ms
[Nest] 21264  - 04/22/2022, 4:57:20 PM     LOG [InstanceLoader] BullModule dependencies initialized +4ms
[Nest] 21264  - 04/22/2022, 4:57:20 PM     LOG [InstanceLoader] ConfigModule dependencies initialized +0ms
[Nest] 21264  - 04/22/2022, 4:57:20 PM     LOG [InstanceLoader] BullModule dependencies initialized +12ms
[Nest] 21264  - 04/22/2022, 4:57:20 PM     LOG [InstanceLoader] BullModule dependencies initialized +10ms
[Nest] 21264  - 04/22/2022, 4:57:20 PM     LOG [InstanceLoader] AppModule dependencies initialized +1ms
[Nest] 21264  - 04/22/2022, 4:57:20 PM     LOG [RoutesResolver] AppController {/api/bullqueuedemo}: +62ms
[Nest] 21264  - 04/22/2022, 4:57:20 PM     LOG [RouterExplorer] Mapped {/api/bullqueuedemo/uploadFile, POST} route +3ms
[Nest] 21264  - 04/22/2022, 4:57:20 PM     LOG [RouterExplorer] Mapped {/api/bullqueuedemo, GET} route +1ms
[Nest] 21264  - 04/22/2022, 4:57:20 PM     LOG [NestApplication] Nest application successfully started +582ms
created job 2
[
  {
    id: '1',
    email: '[email protected]',
    first_name: 'John',
    last_name: 'Doe'
  },
  {
    id: '2',
    email: '[email protected]',
    first_name: 'Jacob',
    last_name: 'Drake'
  },
  {
    id: '3',
    email: '[email protected]',
    first_name: 'Jos',
    last_name: 'Butler'
  }
]
User created - 1
User created - 2
User created - 3

Bull-Warteschlangen bieten eine Reihe von Funktionen:

  • Minimale CPU-Auslastung
  • Robustes Design basierend auf Redis
  • Gleichzeitigkeit
  • Wiederholen
  • Ratenbegrenzer
  • Ereignisüberwachung

Eine Frage, die immer wieder auftaucht, ist, wie wir diese Warteschlangen überwachen, wenn Jobs fehlschlagen oder angehalten werden. Eine einfache Lösung wäre die Verwendung von Redis CLI, aber Redis CLI ist nicht immer verfügbar, insbesondere in Produktionsumgebungen. Schließlich kommt ein einfaches UI-basiertes Dashboard – Bull Dashboard.

Integration von Bull Dashboard

Das Tolle an Bull-Warteschlangen ist, dass eine Benutzeroberfläche zur Verfügung steht, um die Warteschlangen zu überwachen. Man kann auch einige Optionen hinzufügen, die es einem Benutzer ermöglichen, Jobs zu wiederholen, die sich in einem fehlgeschlagenen Zustand befinden. Lassen Sie uns zwei Abhängigkeiten @bull-board/express installieren und @bull-board/api .

npm install @bull-board/express – Dadurch wird ein Express-Server-spezifischer Adapter installiert. Wenn Sie fastify verwenden mit Ihrer NestJS-Anwendung benötigen Sie @bull-board/fastify .

npm install @bull-board/api – Dies installiert eine Kernserver-API, die das Erstellen eines Bull-Dashboards ermöglicht.

Bullboard-Klasse hinzufügen

Wir werden eine Bullboard-Warteschlangenklasse erstellen, die einige Eigenschaften für uns festlegt. Es wird ein queuePool erstellt. Dieser Warteschlangenpool wird jedes Mal gefüllt, wenn eine neue Warteschlange eingefügt wird. Wir brauchen auch eine Methode getBullBoardQueues um alle Warteschlangen beim Laden der Benutzeroberfläche zu ziehen.


@Injectable()
export class BullBoardQueue { }

export const queuePool: Set = new Set();

export const getBullBoardQueues = (): BaseAdapter[] => {
    const bullBoardQueues = [...queuePool].reduce((acc: BaseAdapter[], val) => {
        acc.push(new BullAdapter(val))
        return acc
    }, []);

    return bullBoardQueues
}

Controller hinzufügen

Es gibt ein paar Möglichkeiten, wie wir auf die Benutzeroberfläche hätten zugreifen können, aber ich ziehe es vor, dies über einen Controller hinzuzufügen, damit mein Frontend die API aufrufen kann. Wir erstellen einen BullBoardController um unsere eingehende Anfrage, Antwort und als nächstes wie Express-Middleware abzubilden. In unserem Pfad für die Benutzeroberfläche haben wir einen Serveradapter für Express. Dadurch können wir einen Basispfad festlegen. Wir rufen alle bisher eingefügten Warteschlangen mit getBullBoardQueues ab oben beschriebene Methode. Wir verwenden dann createBullBoard API zum Abrufen von addQueue Methode. serverAdapter hat uns einen Router zur Verfügung gestellt, den wir verwenden, um eingehende Anfragen weiterzuleiten. Bevor wir diese Anfrage weiterleiten, müssen wir einen kleinen Hack machen, indem wir entryPointPath durch / ersetzen .


@Controller('/queues/admin')
export class BullBoardController{
    
    @All('*')
    admin(
        @Request() req: express.Request,
        @Response() res: express.Response,
        @Next() next: express.NextFunction,
    ) {
        const serverAdapter = new ExpressAdapter();
        serverAdapter.setBasePath('/queues/admin');
        const queues = getBullBoardQueues();
        const router = serverAdapter.getRouter() as express.Express;
        const { addQueue } = createBullBoard({
            queues: [],
            serverAdapter,
        });
        queues.forEach((queue: BaseAdapter) => {
            addQueue(queue);
        });
        const entryPointPath = '/queues/admin/';
        req.url = req.url.replace(entryPointPath, '/');
        router(req, res, next);
    }
}

Wenn wir nun unsere Anwendung ausführen und auf die Benutzeroberfläche zugreifen, sehen wir eine nette Benutzeroberfläche für Bull Dashboard wie unten:

Das Schöne an dieser Benutzeroberfläche ist schließlich, dass Sie alle getrennten Optionen sehen können.

Schlussfolgerung

Bull-Warteschlangen sind eine großartige Funktion, um einige ressourcenintensive Aufgaben zu verwalten. In diesem Beitrag haben wir gelernt, wie wir Bull-Warteschlangen in unserer NestJS-Anwendung hinzufügen können. Wir haben auch einfach ein Bull Board in unsere Anwendung integriert, um diese Warteschlangen zu verwalten. Den Code für diesen Beitrag finden Sie hier.

Möchten Sie weitere Beiträge über NestJS lesen? Senden Sie mir hier Ihr Feedback.


No
Java-Tag