Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import { Processor, WorkerHost, InjectQueue } from '@nestjs/bullmq';
- import { Job, Queue } from 'bullmq';
- import { OpenAIService } from '../../infrastructure/openai/openai.service';
- import { Logger } from '@nestjs/common';
- import { ConfigService } from '@nestjs/config';
- import { z } from 'zod';
- import zodToJsonSchema from 'zod-to-json-schema';
- // TODO: Some emails are failing to be classified.
- /**
- * [Nest] 46190 - 04/10/2025, 6:42:23 PM ERROR [CategorizationWorker] Failed to process categorization job 289: 429 status code (no body)
- * Error: 429 status code (no body)
- */
- const systemPrompt = `
- Act as an email classification expert. Classify the following email message into one of the following categories:
- 1 (To respond): Emails you need to respond to.
- 2 (FYI): Emails that don't require your response, but are important.
- 3 (Comment): Team chats in tools like Google Docs or Microsoft Office.
- 4 (Notification): Automated updates from tools you use.
- 5 (Meeting update): Calendar updates from Zoom, Google Meet, etc.
- 6 (Awaiting reply): Emails you've sent that you're expecting a reply to.
- 7 (Actioned): Emails you've sent that you're not expecting a reply to.
- 8 (Marketing): Marketing or cold emails.
- Consider the following metadata provided as variables:
- {
- "sender": "{sender}",
- "subject": "{subject}",
- "body": "{body}",
- "labels": "{labels}",
- "attachments": "{attachments}",
- "date": "{date}",
- "recipients": "{recipients}"
- }
- Provide a confidence score (from 0 to 1) for your classification:
- category: the classification category (as a name),
- confidence: the confidence score,
- reason: the reason for the classification.
- `;
- const emailClassificationSchema = z.object({
- category: z.enum([
- 'To respond',
- 'FYI',
- 'Comment',
- 'Notification',
- 'Meeting update',
- 'Awaiting reply',
- 'Actioned',
- 'Marketing',
- ]),
- confidence: z.number(),
- reason: z.string(),
- });
- interface CategorizationJobData {
- userId: string;
- emailId: string;
- subject: string;
- from: string;
- date: string;
- emailBody: string;
- labels: string[];
- }
- @Processor('categorization-queue')
- export class CategorizationWorker extends WorkerHost {
- private readonly logger = new Logger(CategorizationWorker.name);
- constructor(
- @InjectQueue('summarization-queue')
- private readonly summarizationQueue: Queue,
- @InjectQueue('label-adding-queue')
- private readonly labelAddingQueue: Queue,
- private readonly openaiService: OpenAIService,
- private readonly configService: ConfigService,
- ) {
- super();
- }
- async process(job: Job<CategorizationJobData>): Promise<any> {
- this.logger.log(`Starting categorization for job: ${job.id}`);
- try {
- const openai = this.openaiService.getClient();
- // GPT 4o-mini
- const response = await openai.responses.create({
- model: this.configService.get<string>('openai.model') as string,
- input: [
- { role: 'system', content: systemPrompt },
- {
- role: 'user',
- content: `<email_from>${job.data.from}</email_from>
- <email_subject>${job.data.subject}</email_subject>
- <email_content>${job.data.emailBody}</email_content>
- <email_labels>${job.data.labels.join(', ')}</email_labels>`,
- },
- ],
- text: {
- format: {
- type: 'json_schema',
- name: 'email_classification',
- schema: zodToJsonSchema(emailClassificationSchema),
- },
- },
- });
- // Gemini 2.0 Flash
- // const response = await openai.chat.completions.create({
- // model: this.configService.get<string>('openai.model') as string,
- // messages: [
- // { role: 'system', content: systemPrompt },
- // {
- // role: 'user',
- // content: `<email_from>${job.data.from}</email_from>
- // <email_subject>${job.data.subject}</email_subject>
- // <email_body>${job.data.emailBody}</email_body>
- // <email_labels>${job.data.labels.join(', ')}</email_labels>`,
- // },
- // ],
- // response_format: zodResponseFormat(
- // emailClassificationSchema,
- // 'email_classification',
- // ),
- // });
- if (!response.output_text) {
- this.logger.error(`Empty response from OpenAI for job: ${job.id}`);
- throw new Error('Empty response from OpenAI');
- }
- // if (!response.choices[0].message.content) {
- // this.logger.error(`Empty response from OpenAI for job: ${job.id}`);
- // throw new Error('Empty response from OpenAI');
- // }
- // const parsedOutput: unknown = JSON.parse(
- // response.choices[0].message.content,
- // );
- const parsedOutput: unknown = JSON.parse(response.output_text);
- const classificationEmail = emailClassificationSchema.parse(parsedOutput);
- const categorizationResult = {
- category: classificationEmail.category,
- confidence: classificationEmail.confidence,
- reason: classificationEmail.reason,
- };
- this.logger.log(
- `Categorization for job ${job.id}:`,
- categorizationResult,
- );
- await this.labelAddingQueue.add('add-label', {
- userId: job.data.userId,
- emailId: job.data.emailId,
- category: categorizationResult.category,
- });
- // Next step: enqueue a summarization job with the categorization result
- // await this.summarizationQueue.add('summarize-email', {
- // emailId: job.data.emailId,
- // ...categorizationResult,
- // subject: job.data.subject,
- // from: job.data.from,
- // date: job.data.date,
- // emailBody: job.data.emailBody,
- // });
- return {
- message: `Categorization job ${job.id} completed successfully.`,
- result: categorizationResult,
- };
- } catch (error) {
- this.logger.error(
- `Failed to process categorization job ${job.id}: ${error instanceof Error ? error.message : 'Unknown error'}`,
- error instanceof Error ? error.stack : undefined,
- );
- throw error;
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement