WIP - Semantic linking working asynchronously

This commit is contained in:
Francisco Gaona
2026-04-12 11:24:03 +02:00
parent efa57c4ba8
commit 12a82372f4
7 changed files with 124 additions and 12 deletions

View File

@@ -1,5 +1,6 @@
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { BullModule } from '@nestjs/bullmq';
import { PrismaModule } from './prisma/prisma.module';
import { TenantModule } from './tenant/tenant.module';
import { AuthModule } from './auth/auth.module';
@@ -17,6 +18,12 @@ import { KnowledgeModule } from './knowledge/knowledge.module';
ConfigModule.forRoot({
isGlobal: true,
}),
BullModule.forRoot({
connection: {
host: process.env.REDIS_HOST || 'platform-redis',
port: parseInt(process.env.REDIS_PORT || '6379', 10),
},
}),
PrismaModule,
TenantModule,
AuthModule,

View File

@@ -1,16 +1,31 @@
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';
import { KnowledgeController } from './knowledge.controller';
import { CommentService } from './services/comment.service';
import { SemanticOrchestratorService } from './services/semantic-orchestrator.service';
import { SemanticChunkerService } from './services/semantic-chunker.service';
import { SemanticLinkService } from './services/semantic-link.service';
import { SemanticRefreshQueueService } from './services/semantic-refresh-queue.service';
import { SemanticRefreshProcessor } from './semantic-refresh.processor';
import { TenantModule } from '../tenant/tenant.module';
import { MeilisearchModule } from '../search/meilisearch.module';
import { SEMANTIC_REFRESH_QUEUE } from './semantic-refresh.constants';
@Module({
imports: [TenantModule, MeilisearchModule],
imports: [
TenantModule,
MeilisearchModule,
BullModule.registerQueue({ name: SEMANTIC_REFRESH_QUEUE }),
],
controllers: [KnowledgeController],
providers: [CommentService, SemanticOrchestratorService, SemanticChunkerService, SemanticLinkService],
exports: [SemanticOrchestratorService],
providers: [
CommentService,
SemanticOrchestratorService,
SemanticChunkerService,
SemanticLinkService,
SemanticRefreshQueueService,
SemanticRefreshProcessor,
],
exports: [SemanticOrchestratorService, SemanticRefreshQueueService],
})
export class KnowledgeModule {}

View File

@@ -0,0 +1,3 @@
export const SEMANTIC_REFRESH_QUEUE = 'semantic-refresh';
export const SEMANTIC_REFRESH_JOB = 'refresh-record';

View File

@@ -0,0 +1,45 @@
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Logger } from '@nestjs/common';
import { Job } from 'bullmq';
import { SemanticOrchestratorService } from './services/semantic-orchestrator.service';
import { SEMANTIC_REFRESH_QUEUE } from './semantic-refresh.constants';
export type SemanticRefreshJobData = {
tenantId: string;
objectApiName: string;
recordId: string;
userId?: string;
trigger: string;
};
@Processor(SEMANTIC_REFRESH_QUEUE)
export class SemanticRefreshProcessor extends WorkerHost {
private readonly logger = new Logger(SemanticRefreshProcessor.name);
constructor(
private readonly semanticOrchestratorService: SemanticOrchestratorService,
) {
super();
}
async process(job: Job<SemanticRefreshJobData>): Promise<void> {
const { tenantId, objectApiName, recordId, userId, trigger } = job.data;
this.logger.log(
`Processing semantic refresh: ${objectApiName}:${recordId} trigger=${trigger}`,
);
try {
await this.semanticOrchestratorService.refreshRecord(
tenantId,
objectApiName,
recordId,
userId,
trigger,
);
} catch (error) {
this.logger.error(
`Semantic refresh failed: ${objectApiName}:${recordId} trigger=${trigger} error=${error.message}`,
);
throw error; // Let BullMQ handle retries
}
}
}

View File

@@ -1,13 +1,13 @@
import { ForbiddenException, Injectable, NotFoundException } from '@nestjs/common';
import { TenantDatabaseService } from '../../tenant/tenant-database.service';
import { CreateCommentDto, UpdateCommentDto } from '../dto/comment.dto';
import { SemanticOrchestratorService } from './semantic-orchestrator.service';
import { SemanticRefreshQueueService } from './semantic-refresh-queue.service';
@Injectable()
export class CommentService {
constructor(
private readonly tenantDbService: TenantDatabaseService,
private readonly semanticOrchestratorService: SemanticOrchestratorService,
private readonly semanticRefreshQueue: SemanticRefreshQueueService,
) {}
async listComments(tenantId: string, parentObjectApiName: string, parentRecordId: string) {
@@ -36,7 +36,7 @@ export class CommentService {
console.log(
`[Knowledge] Comment created: ${dto.parentObjectApiName}:${dto.parentRecordId} by ${userId}`,
);
await this.semanticOrchestratorService.refreshRecord(
await this.semanticRefreshQueue.enqueue(
tenantId,
dto.parentObjectApiName,
dto.parentRecordId,
@@ -69,7 +69,7 @@ export class CommentService {
console.log(
`[Knowledge] Comment updated: ${existing.parent_object_api_name}:${existing.parent_record_id} by ${userId}`,
);
await this.semanticOrchestratorService.refreshRecord(
await this.semanticRefreshQueue.enqueue(
tenantId,
existing.parent_object_api_name,
existing.parent_record_id,
@@ -97,7 +97,7 @@ export class CommentService {
console.log(
`[Knowledge] Comment deleted: ${existing.parent_object_api_name}:${existing.parent_record_id} by ${userId}`,
);
await this.semanticOrchestratorService.refreshRecord(
await this.semanticRefreshQueue.enqueue(
tenantId,
existing.parent_object_api_name,
existing.parent_record_id,

View File

@@ -0,0 +1,42 @@
import { Injectable, Logger } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';
import {
SEMANTIC_REFRESH_QUEUE,
SEMANTIC_REFRESH_JOB,
} from '../semantic-refresh.constants';
import { SemanticRefreshJobData } from '../semantic-refresh.processor';
@Injectable()
export class SemanticRefreshQueueService {
private readonly logger = new Logger(SemanticRefreshQueueService.name);
constructor(
@InjectQueue(SEMANTIC_REFRESH_QUEUE) private readonly queue: Queue,
) {}
async enqueue(
tenantId: string,
objectApiName: string,
recordId: string,
userId?: string,
trigger: string = 'manual',
): Promise<void> {
const data: SemanticRefreshJobData = {
tenantId,
objectApiName,
recordId,
userId,
trigger,
};
await this.queue.add(SEMANTIC_REFRESH_JOB, data, {
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
removeOnComplete: 100,
removeOnFail: 50,
});
this.logger.debug(
`Enqueued semantic refresh: ${objectApiName}:${recordId} trigger=${trigger}`,
);
}
}

View File

@@ -9,7 +9,7 @@ import { FieldDefinition } from '../models/field-definition.model';
import { User } from '../models/user.model';
import { ObjectMetadata } from './models/dynamic-model.factory';
import { MeilisearchService } from '../search/meilisearch.service';
import { SemanticOrchestratorService } from '../knowledge/services/semantic-orchestrator.service';
import { SemanticRefreshQueueService } from '../knowledge/services/semantic-refresh-queue.service';
type SearchFilter = {
field: string;
@@ -40,7 +40,7 @@ export class ObjectService {
private modelService: ModelService,
private authService: AuthorizationService,
private meilisearchService: MeilisearchService,
private semanticOrchestratorService: SemanticOrchestratorService,
private semanticRefreshQueue: SemanticRefreshQueueService,
) {}
// Setup endpoints - Object metadata management
@@ -1130,7 +1130,7 @@ export class ObjectService {
);
const record = await boundModel.query().insert(normalizedRecordData);
await this.indexRecord(resolvedTenantId, objectApiName, objectDefModel.fields, record);
await this.semanticOrchestratorService.refreshRecord(
await this.semanticRefreshQueue.enqueue(
resolvedTenantId,
objectApiName,
record.id,
@@ -1206,7 +1206,7 @@ export class ObjectService {
await boundModel.query().patch(normalizedEditableData).where({ id: recordId });
const record = await boundModel.query().where({ id: recordId }).first();
await this.indexRecord(resolvedTenantId, objectApiName, objectDefModel.fields, record);
await this.semanticOrchestratorService.refreshRecord(
await this.semanticRefreshQueue.enqueue(
resolvedTenantId,
objectApiName,
recordId,