Advertisement
Uno2K

worker

Apr 10th, 2025
775
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. import { Processor, WorkerHost, InjectQueue } from '@nestjs/bullmq';
  2. import { Job, Queue } from 'bullmq';
  3. import { OpenAIService } from '../../infrastructure/openai/openai.service';
  4. import { Logger } from '@nestjs/common';
  5. import { ConfigService } from '@nestjs/config';
  6. import { z } from 'zod';
  7. import zodToJsonSchema from 'zod-to-json-schema';
  8.  
  9. // TODO: Some emails are failing to be classified.
  10. /**
  11.  * [Nest] 46190  - 04/10/2025, 6:42:23 PM   ERROR [CategorizationWorker] Failed to process categorization job 289: 429 status code (no body)
  12.  * Error: 429 status code (no body)
  13.  */
  14.  
  15. const systemPrompt = `
  16. Act as an email classification expert. Classify the following email message into one of the following categories:
  17.  
  18. 1 (To respond): Emails you need to respond to.
  19. 2 (FYI): Emails that don't require your response, but are important.
  20. 3 (Comment): Team chats in tools like Google Docs or Microsoft Office.
  21. 4 (Notification): Automated updates from tools you use.
  22. 5 (Meeting update): Calendar updates from Zoom, Google Meet, etc.
  23. 6 (Awaiting reply): Emails you've sent that you're expecting a reply to.
  24. 7 (Actioned): Emails you've sent that you're not expecting a reply to.
  25. 8 (Marketing): Marketing or cold emails.
  26.  
  27. Consider the following metadata provided as variables:
  28.  
  29. {
  30.  "sender": "{sender}",
  31.  "subject": "{subject}",
  32.  "body": "{body}",
  33.  "labels": "{labels}",
  34.  "attachments": "{attachments}",
  35.  "date": "{date}",
  36.  "recipients": "{recipients}"
  37. }
  38.  
  39. Provide a confidence score (from 0 to 1) for your classification:
  40. category: the classification category (as a name),
  41. confidence: the confidence score,
  42. reason: the reason for the classification.
  43. `;
  44.  
  45. const emailClassificationSchema = z.object({
  46.  category: z.enum([
  47.    'To respond',
  48.    'FYI',
  49.    'Comment',
  50.    'Notification',
  51.    'Meeting update',
  52.    'Awaiting reply',
  53.    'Actioned',
  54.    'Marketing',
  55.  ]),
  56.  confidence: z.number(),
  57.  reason: z.string(),
  58. });
  59.  
  60. interface CategorizationJobData {
  61.  userId: string;
  62.  emailId: string;
  63.  subject: string;
  64.  from: string;
  65.  date: string;
  66.  emailBody: string;
  67.  labels: string[];
  68. }
  69.  
  70. @Processor('categorization-queue')
  71. export class CategorizationWorker extends WorkerHost {
  72.  private readonly logger = new Logger(CategorizationWorker.name);
  73.  
  74.  constructor(
  75.    @InjectQueue('summarization-queue')
  76.    private readonly summarizationQueue: Queue,
  77.    @InjectQueue('label-adding-queue')
  78.    private readonly labelAddingQueue: Queue,
  79.    private readonly openaiService: OpenAIService,
  80.    private readonly configService: ConfigService,
  81.  ) {
  82.    super();
  83.  }
  84.  
  85.  async process(job: Job<CategorizationJobData>): Promise<any> {
  86.    this.logger.log(`Starting categorization for job: ${job.id}`);
  87.    try {
  88.      const openai = this.openaiService.getClient();
  89.  
  90.      // GPT 4o-mini
  91.      const response = await openai.responses.create({
  92.        model: this.configService.get<string>('openai.model') as string,
  93.        input: [
  94.          { role: 'system', content: systemPrompt },
  95.          {
  96.            role: 'user',
  97.            content: `<email_from>${job.data.from}</email_from>
  98.                      <email_subject>${job.data.subject}</email_subject>
  99.                      <email_content>${job.data.emailBody}</email_content>
  100.                      <email_labels>${job.data.labels.join(', ')}</email_labels>`,
  101.          },
  102.        ],
  103.        text: {
  104.          format: {
  105.            type: 'json_schema',
  106.            name: 'email_classification',
  107.            schema: zodToJsonSchema(emailClassificationSchema),
  108.          },
  109.        },
  110.      });
  111.  
  112.      // Gemini 2.0 Flash
  113.      // const response = await openai.chat.completions.create({
  114.      //   model: this.configService.get<string>('openai.model') as string,
  115.      //   messages: [
  116.      //     { role: 'system', content: systemPrompt },
  117.      //     {
  118.      //       role: 'user',
  119.      //       content: `<email_from>${job.data.from}</email_from>
  120.      //                 <email_subject>${job.data.subject}</email_subject>
  121.      //                 <email_body>${job.data.emailBody}</email_body>
  122.      //                 <email_labels>${job.data.labels.join(', ')}</email_labels>`,
  123.      //     },
  124.      //   ],
  125.      //   response_format: zodResponseFormat(
  126.      //     emailClassificationSchema,
  127.      //     'email_classification',
  128.      //   ),
  129.      // });
  130.  
  131.      if (!response.output_text) {
  132.        this.logger.error(`Empty response from OpenAI for job: ${job.id}`);
  133.        throw new Error('Empty response from OpenAI');
  134.      }
  135.  
  136.      // if (!response.choices[0].message.content) {
  137.      //   this.logger.error(`Empty response from OpenAI for job: ${job.id}`);
  138.      //   throw new Error('Empty response from OpenAI');
  139.      // }
  140.  
  141.      // const parsedOutput: unknown = JSON.parse(
  142.      //   response.choices[0].message.content,
  143.      // );
  144.  
  145.      const parsedOutput: unknown = JSON.parse(response.output_text);
  146.  
  147.      const classificationEmail = emailClassificationSchema.parse(parsedOutput);
  148.  
  149.      const categorizationResult = {
  150.        category: classificationEmail.category,
  151.        confidence: classificationEmail.confidence,
  152.        reason: classificationEmail.reason,
  153.      };
  154.  
  155.      this.logger.log(
  156.        `Categorization for job ${job.id}:`,
  157.        categorizationResult,
  158.      );
  159.  
  160.      await this.labelAddingQueue.add('add-label', {
  161.        userId: job.data.userId,
  162.        emailId: job.data.emailId,
  163.        category: categorizationResult.category,
  164.      });
  165.  
  166.      // Next step: enqueue a summarization job with the categorization result
  167.      // await this.summarizationQueue.add('summarize-email', {
  168.      //   emailId: job.data.emailId,
  169.      //   ...categorizationResult,
  170.      //   subject: job.data.subject,
  171.      //   from: job.data.from,
  172.      //   date: job.data.date,
  173.      //   emailBody: job.data.emailBody,
  174.      // });
  175.  
  176.      return {
  177.        message: `Categorization job ${job.id} completed successfully.`,
  178.        result: categorizationResult,
  179.      };
  180.    } catch (error) {
  181.      this.logger.error(
  182.        `Failed to process categorization job ${job.id}: ${error instanceof Error ? error.message : 'Unknown error'}`,
  183.        error instanceof Error ? error.stack : undefined,
  184.      );
  185.      throw error;
  186.    }
  187.  }
  188. }
  189.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement