Verwenden von Bull-Warteschlangen in der NestJS-Anwendung
In vielen Szenarien müssen Sie asynchrone CPU-intensive Aufgaben bewältigen. Insbesondere, wenn eine Anwendung Daten über die REST-API anfordert. Der REST-Endpunkt sollte innerhalb eines begrenzten Zeitrahmens antworten.
In diesem Beitrag werde ich zeigen, wie wir Warteschlangen verwenden können, um asynchrone Aufgaben zu bewältigen. Wir werden Bull-Warteschlangen in einer einfachen NestJS-Anwendung verwenden.
Warteschlangen sind eine Datenstruktur, die einer linearen Reihenfolge folgt. In den meisten Systemen verhalten sich Warteschlangen wie eine Reihe von Aufgaben. Ein Herausgeber veröffentlicht eine Nachricht oder Aufgabe in der Warteschlange. Ein Verbraucher nimmt diese Nachricht zur weiteren Verarbeitung auf. Dies kann asynchron geschehen und bietet die dringend benötigte Ruhepause für CPU-intensive Aufgaben. Sobald der Verbraucher die Nachricht konsumiert hat, steht die Nachricht keinem anderen Verbraucher zur Verfügung.
Bull-Warteschlangen basieren auf Redis. In meinem vorherigen Beitrag habe ich behandelt, wie man eine Zustandsprüfung für Redis oder eine Datenbank in einer NestJS-Anwendung hinzufügt.
- NestJS-Anwendung einrichten
- Bull-Warteschlangen in der NestJS-Anwendung
- Implementieren eines Prozessors zum Verarbeiten von Warteschlangendaten
- Integration von Bull Dashboard
- Bull Board-Klasse hinzufügen
- Controller hinzufügen
- Schlussfolgerung
NestJS-Anwendung einrichten
Im Rahmen dieser Demo erstellen wir eine einfache Anwendung. Wir werden Benutzerdaten über eine CSV-Datei hochladen. Ein Controller akzeptiert diese Datei und übergibt sie an eine Warteschlange. Ein Prozessor übernimmt den Auftrag in der Warteschlange und verarbeitet die Datei, um Daten aus der CSV-Datei in der Datenbank zu speichern.
nest new bullqueuedemo
Sobald dieser Befehl den Ordner für bullqueuedemo
erstellt , richten wir Prisma ORM ein, um eine Verbindung zur Datenbank herzustellen. (Hinweis – stellen Sie sicher, dass Sie Prisma-Abhängigkeiten installieren.).
npx prisma init
Wenn Sie einen Windows-Computer verwenden, tritt möglicherweise ein Fehler beim Ausführen von prisma init auf. Alles in allem sollten Sie eine Umgebungsvariable einrichten, um diesen Fehler zu vermeiden.
set PRISMA_CLI_QUERY_ENGINE_TYPE=binary
set PRISMA_CLIENT_ENGINE_TYPE=binary
Sobald das Schema erstellt ist, aktualisieren wir es mit unseren Datenbanktabellen. Für diese Demo erstellen wir eine einzelne Tabelle user
.
// This is your Prisma schema file,
// learn more about it in the docs: https://pris.ly/d/prisma-schema
generator client {
provider = "prisma-client-js"
engineType = "binary"
}
datasource db {
provider = "mysql"
url = env("DATABASE_URL")
}
model User {
id Int @default(autoincrement()) @id
email String @unique
first_name String
last_name String?
}
Wenn wir jetzt npm run prisma migrate dev
ausführen , wird eine Datenbanktabelle erstellt.
Zusammenfassend haben wir bisher eine NestJS-Anwendung erstellt und unsere Datenbank mit Prisma ORM eingerichtet. Schauen wir uns die Konfiguration an, die wir für Bull Queue hinzufügen müssen.
Bull-Warteschlangen in der NestJS-Anwendung
Installieren Sie @nestjs/bull
Abhängigkeit. Diese Abhängigkeit kapselt die bull-Bibliothek. Wir gehen davon aus, dass Sie redis
haben installiert und läuft. Standardmäßig wird Redis auf Port 6379 ausgeführt.
Wir werden REDIS_HOST
hinzufügen und REDIS_PORT
als Umgebungsvariablen in unserem .env
Datei. Installieren Sie zwei Abhängigkeiten für Bull wie folgt:
npm install @nestjs/bull
npm install @types/bull
Anschließend richten wir die Verbindung mit Redis ein, indem wir BullModule
hinzufügen zu unserem App-Modul.
@Module({
imports: [
BullModule.forRootAsync({
imports: [ConfigModule],
useFactory: async (configService: ConfigService) => ({
redis: {
host: configService.get('REDIS_HOST'),
port: Number(configService.get('REDIS_PORT')),
},
}),
inject: [ConfigService]
}),
BullModule.registerQueue({
name: 'file-upload-queue'
}),
],
controllers: [AppController, BullBoardController],
providers: [UserService, PrismaService, FileUploadProcessor,],
})
export class AppModule {}
Wir injizieren ConfigService. Dieser Dienst ermöglicht es uns, Umgebungsvariablen zur Laufzeit abzurufen. Damit können wir BullModule
verwenden in unserer Anwendung.
Wie Sie im obigen Code sehen können, haben wir BullModule.registerQueue
und das registriert unsere Warteschlange file-upload-queue
. Lassen Sie uns nun diese Warteschlange in unserem Controller hinzufügen, wo sie verwendet wird.
@Controller('/api/bullqueuedemo')
export class AppController {
constructor(@InjectQueue('file-upload-queue') private fileQueue: Queue) {
queuePool.add(fileQueue);
}
@Post('/uploadFile')
@UseInterceptors(FileInterceptor("csv", {
storage: diskStorage({
destination: './csv',
fileName: (req, file, cb) => {
const randomName = Array(32).fill(null).map(() => (Math.round(Math.random() * cb(null, `${randomName}${extname(file.originalname)}`))))
}
})
}))
async uploadCsvFile(@UploadedFile() file): Promise {
const job = await this.fileQueue.add('csvfilejob', {file: file});
console.log(`created job ${ job.id}`);
}
@Get('/')
async getHello(): Promise {
return "Hello World";
}
}
Lassen Sie uns diesen Code langsam durchgehen, um zu verstehen, was passiert.
- Im Konstruktor fügen wir die Warteschlange
InjectQueue('file-upload-queue')
ein . - Unsere POST-API dient zum Hochladen einer CSV-Datei.
- Wir verwenden einen FileInterceptor. Dies ist eine Funktion, die NestJS anbietet, um die Anfrage abzufangen und Dateien aus der Anfrage zu extrahieren. Dieser Interceptor nimmt zwei Argumente
fieldName
undoptions
. storage
Option ermöglicht es uns, die hochgeladene Datei in einem Ordner namenscsv
zu speichern im aktuellen Ausführungsverzeichnis. Die hochgeladene Datei wird mit einem zufällig generierten Namen und der Erweiterung.csv
umbenannt .- In der Methode
uploadCsvFile
, erhalten wir die hochgeladene Datei. Dies kommt von unserem FileInterceptor. Wir verwenden unsere injizierte Warteschlange, um einen Job mit dem Namencsvfilejob
hinzuzufügen und Daten, die die Datei enthalten.
Implementieren eines Prozessors zum Verarbeiten von Warteschlangendaten
Danach haben wir unserer Warteschlange file-upload-queue
einen Job hinzugefügt . Um diesen Job nun weiter zu verarbeiten, implementieren wir einen Prozessor FileUploadProcessor
.
Wir werden diesen Verbraucher mit @Processor('file-upload-queue')
annotieren .
@Processor('file-upload-queue')
export class FileUploadProcessor {
constructor(private readonly userService: UserService){}
@Process('csvfilejob')
async processFile(job: Job) {
const file = job.data.file;
const filePath = file.path;
const userData = await csv().fromFile(filePath);
console.log(userData);
for(const user of userData) {
const input = {
email: user.email,
first_name: user.first_name,
last_name: user.last_name,
};
const userCreated = await this.userService.createUser(input);
console.log('User created -', userCreated.id );
}
}
}
In Kürze können wir sehen, dass wir den Job aus der Warteschlange verbrauchen und die Datei aus den Jobdaten abrufen. Beachten Sie, dass wir @Process(jobName)
hinzufügen müssen zu der Methode, die den Job verbraucht. processFile
Methode verbraucht den Job. Wir konvertieren CSV-Daten in JSON und verarbeiten dann jede Zeile, um einen Benutzer mit UserService.
zu unserer Datenbank hinzuzufügen
Sobald Sie FileUploadProcessor
erstellt haben , stellen Sie sicher, dass Sie diesen als Anbieter in Ihrem App-Modul registrieren.
Um dies zu zeigen, wenn ich die API über Postman ausführe, sehe ich die folgenden Daten in der Konsole:
[Nest] 21264 - 04/22/2022, 4:57:19 PM LOG [NestFactory] Starting Nest application...
[Nest] 21264 - 04/22/2022, 4:57:20 PM LOG [InstanceLoader] DiscoveryModule dependencies initialized +43ms
[Nest] 21264 - 04/22/2022, 4:57:20 PM LOG [InstanceLoader] ConfigHostModule dependencies initialized +0ms
[Nest] 21264 - 04/22/2022, 4:57:20 PM LOG [InstanceLoader] BullModule dependencies initialized +4ms
[Nest] 21264 - 04/22/2022, 4:57:20 PM LOG [InstanceLoader] ConfigModule dependencies initialized +0ms
[Nest] 21264 - 04/22/2022, 4:57:20 PM LOG [InstanceLoader] BullModule dependencies initialized +12ms
[Nest] 21264 - 04/22/2022, 4:57:20 PM LOG [InstanceLoader] BullModule dependencies initialized +10ms
[Nest] 21264 - 04/22/2022, 4:57:20 PM LOG [InstanceLoader] AppModule dependencies initialized +1ms
[Nest] 21264 - 04/22/2022, 4:57:20 PM LOG [RoutesResolver] AppController {/api/bullqueuedemo}: +62ms
[Nest] 21264 - 04/22/2022, 4:57:20 PM LOG [RouterExplorer] Mapped {/api/bullqueuedemo/uploadFile, POST} route +3ms
[Nest] 21264 - 04/22/2022, 4:57:20 PM LOG [RouterExplorer] Mapped {/api/bullqueuedemo, GET} route +1ms
[Nest] 21264 - 04/22/2022, 4:57:20 PM LOG [NestApplication] Nest application successfully started +582ms
created job 2
[
{
id: '1',
email: '[email protected]',
first_name: 'John',
last_name: 'Doe'
},
{
id: '2',
email: '[email protected]',
first_name: 'Jacob',
last_name: 'Drake'
},
{
id: '3',
email: '[email protected]',
first_name: 'Jos',
last_name: 'Butler'
}
]
User created - 1
User created - 2
User created - 3
Bull-Warteschlangen bieten eine Reihe von Funktionen:
- Minimale CPU-Auslastung
- Robustes Design basierend auf Redis
- Gleichzeitigkeit
- Wiederholen
- Ratenbegrenzer
- Ereignisüberwachung
Eine Frage, die immer wieder auftaucht, ist, wie wir diese Warteschlangen überwachen, wenn Jobs fehlschlagen oder angehalten werden. Eine einfache Lösung wäre die Verwendung von Redis CLI, aber Redis CLI ist nicht immer verfügbar, insbesondere in Produktionsumgebungen. Schließlich kommt ein einfaches UI-basiertes Dashboard – Bull Dashboard.
Integration von Bull Dashboard
Das Tolle an Bull-Warteschlangen ist, dass eine Benutzeroberfläche zur Verfügung steht, um die Warteschlangen zu überwachen. Man kann auch einige Optionen hinzufügen, die es einem Benutzer ermöglichen, Jobs zu wiederholen, die sich in einem fehlgeschlagenen Zustand befinden. Lassen Sie uns zwei Abhängigkeiten @bull-board/express
installieren und @bull-board/api
.
npm install @bull-board/express
– Dadurch wird ein Express-Server-spezifischer Adapter installiert. Wenn Sie fastify
verwenden mit Ihrer NestJS-Anwendung benötigen Sie @bull-board/fastify
.
npm install @bull-board/api
– Dies installiert eine Kernserver-API, die das Erstellen eines Bull-Dashboards ermöglicht.
Bullboard-Klasse hinzufügen
Wir werden eine Bullboard-Warteschlangenklasse erstellen, die einige Eigenschaften für uns festlegt. Es wird ein queuePool erstellt. Dieser Warteschlangenpool wird jedes Mal gefüllt, wenn eine neue Warteschlange eingefügt wird. Wir brauchen auch eine Methode getBullBoardQueues
um alle Warteschlangen beim Laden der Benutzeroberfläche zu ziehen.
@Injectable()
export class BullBoardQueue { }
export const queuePool: Set = new Set();
export const getBullBoardQueues = (): BaseAdapter[] => {
const bullBoardQueues = [...queuePool].reduce((acc: BaseAdapter[], val) => {
acc.push(new BullAdapter(val))
return acc
}, []);
return bullBoardQueues
}
Controller hinzufügen
Es gibt ein paar Möglichkeiten, wie wir auf die Benutzeroberfläche hätten zugreifen können, aber ich ziehe es vor, dies über einen Controller hinzuzufügen, damit mein Frontend die API aufrufen kann. Wir erstellen einen BullBoardController
um unsere eingehende Anfrage, Antwort und als nächstes wie Express-Middleware abzubilden. In unserem Pfad für die Benutzeroberfläche haben wir einen Serveradapter für Express. Dadurch können wir einen Basispfad festlegen. Wir rufen alle bisher eingefügten Warteschlangen mit getBullBoardQueues
ab oben beschriebene Methode. Wir verwenden dann createBullBoard
API zum Abrufen von addQueue
Methode. serverAdapter
hat uns einen Router zur Verfügung gestellt, den wir verwenden, um eingehende Anfragen weiterzuleiten. Bevor wir diese Anfrage weiterleiten, müssen wir einen kleinen Hack machen, indem wir entryPointPath durch /
ersetzen .
@Controller('/queues/admin')
export class BullBoardController{
@All('*')
admin(
@Request() req: express.Request,
@Response() res: express.Response,
@Next() next: express.NextFunction,
) {
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/queues/admin');
const queues = getBullBoardQueues();
const router = serverAdapter.getRouter() as express.Express;
const { addQueue } = createBullBoard({
queues: [],
serverAdapter,
});
queues.forEach((queue: BaseAdapter) => {
addQueue(queue);
});
const entryPointPath = '/queues/admin/';
req.url = req.url.replace(entryPointPath, '/');
router(req, res, next);
}
}
Wenn wir nun unsere Anwendung ausführen und auf die Benutzeroberfläche zugreifen, sehen wir eine nette Benutzeroberfläche für Bull Dashboard wie unten:
Das Schöne an dieser Benutzeroberfläche ist schließlich, dass Sie alle getrennten Optionen sehen können.
Schlussfolgerung
Bull-Warteschlangen sind eine großartige Funktion, um einige ressourcenintensive Aufgaben zu verwalten. In diesem Beitrag haben wir gelernt, wie wir Bull-Warteschlangen in unserer NestJS-Anwendung hinzufügen können. Wir haben auch einfach ein Bull Board in unsere Anwendung integriert, um diese Warteschlangen zu verwalten. Den Code für diesen Beitrag finden Sie hier.
Möchten Sie weitere Beiträge über NestJS lesen? Senden Sie mir hier Ihr Feedback.