diff --git a/backend/src/app.module.ts b/backend/src/app.module.ts index fa47d08..f09fa13 100644 --- a/backend/src/app.module.ts +++ b/backend/src/app.module.ts @@ -1,5 +1,6 @@ import { Module } from '@nestjs/common'; import { ConfigModule } from '@nestjs/config'; +import { BullModule } from '@nestjs/bullmq'; import { PrismaModule } from './prisma/prisma.module'; import { TenantModule } from './tenant/tenant.module'; import { AuthModule } from './auth/auth.module'; @@ -17,6 +18,12 @@ import { KnowledgeModule } from './knowledge/knowledge.module'; ConfigModule.forRoot({ isGlobal: true, }), + BullModule.forRoot({ + connection: { + host: process.env.REDIS_HOST || 'platform-redis', + port: parseInt(process.env.REDIS_PORT || '6379', 10), + }, + }), PrismaModule, TenantModule, AuthModule, diff --git a/backend/src/knowledge/knowledge.module.ts b/backend/src/knowledge/knowledge.module.ts index 98803e7..d5b62ef 100644 --- a/backend/src/knowledge/knowledge.module.ts +++ b/backend/src/knowledge/knowledge.module.ts @@ -1,16 +1,31 @@ import { Module } from '@nestjs/common'; +import { BullModule } from '@nestjs/bullmq'; import { KnowledgeController } from './knowledge.controller'; import { CommentService } from './services/comment.service'; import { SemanticOrchestratorService } from './services/semantic-orchestrator.service'; import { SemanticChunkerService } from './services/semantic-chunker.service'; import { SemanticLinkService } from './services/semantic-link.service'; +import { SemanticRefreshQueueService } from './services/semantic-refresh-queue.service'; +import { SemanticRefreshProcessor } from './semantic-refresh.processor'; import { TenantModule } from '../tenant/tenant.module'; import { MeilisearchModule } from '../search/meilisearch.module'; +import { SEMANTIC_REFRESH_QUEUE } from './semantic-refresh.constants'; @Module({ - imports: [TenantModule, MeilisearchModule], + imports: [ + TenantModule, + MeilisearchModule, + BullModule.registerQueue({ name: SEMANTIC_REFRESH_QUEUE }), + ], controllers: [KnowledgeController], - providers: [CommentService, SemanticOrchestratorService, SemanticChunkerService, SemanticLinkService], - exports: [SemanticOrchestratorService], + providers: [ + CommentService, + SemanticOrchestratorService, + SemanticChunkerService, + SemanticLinkService, + SemanticRefreshQueueService, + SemanticRefreshProcessor, + ], + exports: [SemanticOrchestratorService, SemanticRefreshQueueService], }) export class KnowledgeModule {} diff --git a/backend/src/knowledge/semantic-refresh.constants.ts b/backend/src/knowledge/semantic-refresh.constants.ts new file mode 100644 index 0000000..519da64 --- /dev/null +++ b/backend/src/knowledge/semantic-refresh.constants.ts @@ -0,0 +1,3 @@ +export const SEMANTIC_REFRESH_QUEUE = 'semantic-refresh'; + +export const SEMANTIC_REFRESH_JOB = 'refresh-record'; diff --git a/backend/src/knowledge/semantic-refresh.processor.ts b/backend/src/knowledge/semantic-refresh.processor.ts new file mode 100644 index 0000000..d0bb072 --- /dev/null +++ b/backend/src/knowledge/semantic-refresh.processor.ts @@ -0,0 +1,45 @@ +import { Processor, WorkerHost } from '@nestjs/bullmq'; +import { Logger } from '@nestjs/common'; +import { Job } from 'bullmq'; +import { SemanticOrchestratorService } from './services/semantic-orchestrator.service'; +import { SEMANTIC_REFRESH_QUEUE } from './semantic-refresh.constants'; + +export type SemanticRefreshJobData = { + tenantId: string; + objectApiName: string; + recordId: string; + userId?: string; + trigger: string; +}; + +@Processor(SEMANTIC_REFRESH_QUEUE) +export class SemanticRefreshProcessor extends WorkerHost { + private readonly logger = new Logger(SemanticRefreshProcessor.name); + + constructor( + private readonly semanticOrchestratorService: SemanticOrchestratorService, + ) { + super(); + } + + async process(job: Job): Promise { + const { tenantId, objectApiName, recordId, userId, trigger } = job.data; + this.logger.log( + `Processing semantic refresh: ${objectApiName}:${recordId} trigger=${trigger}`, + ); + try { + await this.semanticOrchestratorService.refreshRecord( + tenantId, + objectApiName, + recordId, + userId, + trigger, + ); + } catch (error) { + this.logger.error( + `Semantic refresh failed: ${objectApiName}:${recordId} trigger=${trigger} error=${error.message}`, + ); + throw error; // Let BullMQ handle retries + } + } +} diff --git a/backend/src/knowledge/services/comment.service.ts b/backend/src/knowledge/services/comment.service.ts index ad1e2ac..3b04599 100644 --- a/backend/src/knowledge/services/comment.service.ts +++ b/backend/src/knowledge/services/comment.service.ts @@ -1,13 +1,13 @@ import { ForbiddenException, Injectable, NotFoundException } from '@nestjs/common'; import { TenantDatabaseService } from '../../tenant/tenant-database.service'; import { CreateCommentDto, UpdateCommentDto } from '../dto/comment.dto'; -import { SemanticOrchestratorService } from './semantic-orchestrator.service'; +import { SemanticRefreshQueueService } from './semantic-refresh-queue.service'; @Injectable() export class CommentService { constructor( private readonly tenantDbService: TenantDatabaseService, - private readonly semanticOrchestratorService: SemanticOrchestratorService, + private readonly semanticRefreshQueue: SemanticRefreshQueueService, ) {} async listComments(tenantId: string, parentObjectApiName: string, parentRecordId: string) { @@ -36,7 +36,7 @@ export class CommentService { console.log( `[Knowledge] Comment created: ${dto.parentObjectApiName}:${dto.parentRecordId} by ${userId}`, ); - await this.semanticOrchestratorService.refreshRecord( + await this.semanticRefreshQueue.enqueue( tenantId, dto.parentObjectApiName, dto.parentRecordId, @@ -69,7 +69,7 @@ export class CommentService { console.log( `[Knowledge] Comment updated: ${existing.parent_object_api_name}:${existing.parent_record_id} by ${userId}`, ); - await this.semanticOrchestratorService.refreshRecord( + await this.semanticRefreshQueue.enqueue( tenantId, existing.parent_object_api_name, existing.parent_record_id, @@ -97,7 +97,7 @@ export class CommentService { console.log( `[Knowledge] Comment deleted: ${existing.parent_object_api_name}:${existing.parent_record_id} by ${userId}`, ); - await this.semanticOrchestratorService.refreshRecord( + await this.semanticRefreshQueue.enqueue( tenantId, existing.parent_object_api_name, existing.parent_record_id, diff --git a/backend/src/knowledge/services/semantic-refresh-queue.service.ts b/backend/src/knowledge/services/semantic-refresh-queue.service.ts new file mode 100644 index 0000000..37c5a73 --- /dev/null +++ b/backend/src/knowledge/services/semantic-refresh-queue.service.ts @@ -0,0 +1,42 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { InjectQueue } from '@nestjs/bullmq'; +import { Queue } from 'bullmq'; +import { + SEMANTIC_REFRESH_QUEUE, + SEMANTIC_REFRESH_JOB, +} from '../semantic-refresh.constants'; +import { SemanticRefreshJobData } from '../semantic-refresh.processor'; + +@Injectable() +export class SemanticRefreshQueueService { + private readonly logger = new Logger(SemanticRefreshQueueService.name); + + constructor( + @InjectQueue(SEMANTIC_REFRESH_QUEUE) private readonly queue: Queue, + ) {} + + async enqueue( + tenantId: string, + objectApiName: string, + recordId: string, + userId?: string, + trigger: string = 'manual', + ): Promise { + const data: SemanticRefreshJobData = { + tenantId, + objectApiName, + recordId, + userId, + trigger, + }; + await this.queue.add(SEMANTIC_REFRESH_JOB, data, { + attempts: 3, + backoff: { type: 'exponential', delay: 2000 }, + removeOnComplete: 100, + removeOnFail: 50, + }); + this.logger.debug( + `Enqueued semantic refresh: ${objectApiName}:${recordId} trigger=${trigger}`, + ); + } +} diff --git a/backend/src/object/object.service.ts b/backend/src/object/object.service.ts index dfd08d7..d4bf5b9 100644 --- a/backend/src/object/object.service.ts +++ b/backend/src/object/object.service.ts @@ -9,7 +9,7 @@ import { FieldDefinition } from '../models/field-definition.model'; import { User } from '../models/user.model'; import { ObjectMetadata } from './models/dynamic-model.factory'; import { MeilisearchService } from '../search/meilisearch.service'; -import { SemanticOrchestratorService } from '../knowledge/services/semantic-orchestrator.service'; +import { SemanticRefreshQueueService } from '../knowledge/services/semantic-refresh-queue.service'; type SearchFilter = { field: string; @@ -40,7 +40,7 @@ export class ObjectService { private modelService: ModelService, private authService: AuthorizationService, private meilisearchService: MeilisearchService, - private semanticOrchestratorService: SemanticOrchestratorService, + private semanticRefreshQueue: SemanticRefreshQueueService, ) {} // Setup endpoints - Object metadata management @@ -1130,7 +1130,7 @@ export class ObjectService { ); const record = await boundModel.query().insert(normalizedRecordData); await this.indexRecord(resolvedTenantId, objectApiName, objectDefModel.fields, record); - await this.semanticOrchestratorService.refreshRecord( + await this.semanticRefreshQueue.enqueue( resolvedTenantId, objectApiName, record.id, @@ -1206,7 +1206,7 @@ export class ObjectService { await boundModel.query().patch(normalizedEditableData).where({ id: recordId }); const record = await boundModel.query().where({ id: recordId }).first(); await this.indexRecord(resolvedTenantId, objectApiName, objectDefModel.fields, record); - await this.semanticOrchestratorService.refreshRecord( + await this.semanticRefreshQueue.enqueue( resolvedTenantId, objectApiName, recordId,