import { Injectable, Logger } from '@nestjs/common'; import { TenantDatabaseService } from '../../tenant/tenant-database.service'; import { MeilisearchService } from '../../search/meilisearch.service'; import { getCentralPrisma } from '../../prisma/central-prisma.service'; import { OpenAIConfig } from '../../voice/interfaces/integration-config.interface'; import { randomUUID } from 'crypto'; import { DefaultSemanticProjectionAdapter, SemanticProjectionAdapter, } from '../adapters/semantic-projection.adapter'; import { SemanticChunkerService } from './semantic-chunker.service'; import { SemanticLinkService } from './semantic-link.service'; @Injectable() export class SemanticOrchestratorService { private readonly logger = new Logger(SemanticOrchestratorService.name); private readonly adapters: SemanticProjectionAdapter[] = [new DefaultSemanticProjectionAdapter()]; private readonly defaultEmbeddingModel = process.env.OPENAI_EMBEDDING_MODEL || 'text-embedding-3-small'; private readonly semanticEmbedderName = 'default'; constructor( private readonly tenantDbService: TenantDatabaseService, private readonly meilisearchService: MeilisearchService, private readonly chunkerService: SemanticChunkerService, private readonly semanticLinkService: SemanticLinkService, ) {} async refreshRecord( tenantId: string, objectApiName: string, recordId: string, userId?: string, trigger: string = 'manual', ) { this.logger.log( `Semantic refresh start: ${objectApiName}:${recordId} (trigger=${trigger})`, ); const resolvedTenantId = await this.tenantDbService.resolveTenantId(tenantId); const knex = await this.tenantDbService.getTenantKnexById(resolvedTenantId); const objectDefinition = await knex('object_definitions').where({ apiName: objectApiName }).first(); if (!objectDefinition) { this.logger.warn(`Object definition ${objectApiName} not found. Skipping semantic refresh.`); return { skipped: true }; } const tableName = this.getTableName(objectDefinition); const record = await knex(tableName).where({ id: recordId }).first(); if (!record) { this.logger.warn(`Record not found for semantic refresh: ${objectApiName}:${recordId}`); return { skipped: true }; } const comments = await knex('comments') .where({ parent_object_api_name: objectApiName, parent_record_id: recordId, }) .orderBy('created_at', 'asc'); this.logger.log( `Semantic refresh source: ${objectApiName}:${recordId} comments=${comments.length}`, ); const adapter = this.adapters.find((candidate) => candidate.supports(objectApiName))!; const projection = adapter.buildProjection({ objectApiName, record, objectDefinition, comments, }); const documentId = await this.upsertSemanticDocument(knex, projection); // Use embeddingNarrative (plain values, no labels) so lexical noise from 'key:' // prefixes doesn't inflate match scores. Comments are passed separately so they // are not double-counted (narrative already embeds them with 'Comment N:' prefix). const chunks = this.chunkerService.chunkText(projection.embeddingNarrative, comments); this.logger.log( `Semantic refresh chunking: ${objectApiName}:${recordId} chunks=${chunks.length}`, ); await this.replaceChunks(knex, documentId, chunks); const openAiConfig = await this.getOpenAiConfig(resolvedTenantId); const embedderReady = await this.indexChunks(resolvedTenantId, projection, chunks, openAiConfig); await this.generateSuggestions( resolvedTenantId, projection, chunks, openAiConfig, embedderReady, userId, trigger, ); this.logger.log( `Semantic refresh complete: ${objectApiName}:${recordId} document=${documentId}`, ); return { documentId, chunkCount: chunks.length }; } async reindexObject(tenantId: string, objectApiName: string, userId?: string, limit = 250) { const resolvedTenantId = await this.tenantDbService.resolveTenantId(tenantId); const knex = await this.tenantDbService.getTenantKnexById(resolvedTenantId); const objectDefinition = await knex('object_definitions').where({ apiName: objectApiName }).first(); if (!objectDefinition) { return { total: 0, processed: 0 }; } const tableName = this.getTableName(objectDefinition); const records = await knex(tableName).select('id').limit(limit); let processed = 0; for (const record of records) { await this.refreshRecord(resolvedTenantId, objectApiName, record.id, userId, 'batch_reindex'); processed += 1; } return { total: records.length, processed }; } private async upsertSemanticDocument(knex: any, projection: any): Promise { const existing = await knex('semantic_documents') .where({ entity_type: projection.entityType, entity_id: projection.entityId }) .first(); if (existing) { await knex('semantic_documents') .where({ id: existing.id }) .update({ title: projection.title, narrative: projection.narrative, metadata: JSON.stringify(projection.metadata || {}), source_summary: JSON.stringify(projection.sourceSummary || {}), updated_at: knex.fn.now(), }); return existing.id; } const newId = randomUUID(); const [created] = await knex('semantic_documents') .insert({ id: newId, entity_type: projection.entityType, entity_id: projection.entityId, title: projection.title, narrative: projection.narrative, metadata: JSON.stringify(projection.metadata || {}), source_summary: JSON.stringify(projection.sourceSummary || {}), created_at: knex.fn.now(), updated_at: knex.fn.now(), }) .returning('id'); if (created && typeof created === 'object' && created.id) { return created.id; } // MySQL may return a numeric insert id (often 0 for UUID PKs). Always trust the generated UUID. return newId; } private async replaceChunks(knex: any, documentId: string, chunks: any[]) { if (!documentId) { this.logger.warn('Skipping chunk replace: missing semantic document id.'); return; } await knex('semantic_chunks').where({ semantic_document_id: documentId }).delete(); if (!chunks.length) return; await knex('semantic_chunks').insert( chunks.map((chunk) => ({ semantic_document_id: documentId, chunk_index: chunk.chunkIndex, source_kind: chunk.sourceKind, source_ref_id: chunk.sourceRefId, text: chunk.text, metadata: JSON.stringify(chunk.metadata || {}), created_at: knex.fn.now(), updated_at: knex.fn.now(), })), ); } private async indexChunks( tenantId: string, projection: any, chunks: any[], openAiConfig: OpenAIConfig | null, ) { if (!this.meilisearchService.isEnabled()) { this.logger.warn('Meilisearch disabled; skipping semantic chunk indexing.'); return false; } const indexName = this.meilisearchService.buildSemanticChunkIndexName(tenantId); let embedderReady = false; if (openAiConfig?.apiKey) { embedderReady = await this.meilisearchService.ensureOpenAiEmbedder(indexName, { embedderName: this.semanticEmbedderName, apiKey: openAiConfig.apiKey, model: openAiConfig.embeddingModel || this.defaultEmbeddingModel, documentTemplate: '{{doc.title}}\n{{doc.text}}', }); this.logger.log( `Meilisearch embedder ensured: index=${indexName} model=${openAiConfig.embeddingModel || this.defaultEmbeddingModel}`, ); } else { this.logger.warn('OpenAI embedder not configured; semantic search will be lexical only.'); } this.logger.log(`Indexing semantic chunks: index=${indexName} count=${chunks.length}`); await this.meilisearchService.upsertDocuments(indexName, chunks.map((chunk) => ({ id: `${projection.entityType}_${projection.entityId}_${chunk.chunkIndex}`, entityType: projection.entityType, entityId: projection.entityId, title: projection.title, sourceKind: chunk.sourceKind, sourceRefId: chunk.sourceRefId, text: chunk.text, }))); return embedderReady; } private async generateSuggestions( tenantId: string, projection: any, chunks: any[], openAiConfig: OpenAIConfig | null, embedderReady: boolean, userId?: string, trigger: string = 'semantic_refresh', ) { if (!this.meilisearchService.isEnabled() || !chunks.length) { this.logger.warn( `Skipping suggestion generation: meili=${this.meilisearchService.isEnabled()} chunks=${chunks.length}`, ); return; } const indexName = this.meilisearchService.buildSemanticChunkIndexName(tenantId); const queryText = chunks.slice(0, 3).map((chunk) => chunk.text).join(' ').slice(0, 1200); this.logger.log( `Generating suggestions: index=${indexName} queryLen=${queryText.length} hybrid=${embedderReady}`, ); const search = await this.meilisearchService.searchIndex( indexName, queryText, 20, // semanticRatio:1.0 = pure vector search, no lexical component that would // match on shared tokens like 'name:' or 'Comment 1:' across all records. embedderReady ? { embedder: this.semanticEmbedderName, semanticRatio: 1.0 } : undefined, ); this.logger.log( `Meilisearch results: index=${indexName} hits=${search.hits?.length || 0} total=${search.total}`, ); const grouped = new Map(); for (const hit of search.hits || []) { if (hit.entityType === projection.entityType && hit.entityId === projection.entityId) { continue; } // Skip self-links where source and target resolve to the same entity if (hit.entityId === projection.entityId) { continue; } const key = `${hit.entityType}:${hit.entityId}`; if (!grouped.has(key)) grouped.set(key, []); grouped.get(key).push(hit); } const resolvedTenantId = await this.tenantDbService.resolveTenantId(tenantId); const knex = await this.tenantDbService.getTenantKnexById(resolvedTenantId); for (const [key, hits] of grouped.entries()) { const [targetType, targetId] = key.split(':'); const confidence = Math.min(0.99, 0.3 + hits.length * 0.1); await this.semanticLinkService.upsertSuggestedLink(knex, { sourceEntityType: projection.entityType, sourceEntityId: projection.entityId, targetEntityType: targetType, targetEntityId: targetId, linkType: 'related_to', status: 'suggested', origin: 'semantic', confidence, reason: `Suggested from semantic similarity (${trigger})`, evidence: { trigger, sourceSignals: chunks.slice(0, 2).map((chunk) => ({ sourceKind: chunk.sourceKind, text: chunk.text.slice(0, 180), })), matchedChunks: hits.slice(0, 3).map((hit) => ({ sourceKind: hit.sourceKind, text: String(hit.text || '').slice(0, 180), })), }, suggestedByUserId: userId || null, }); } } private getTableName(objectDefinition: any): string { if (objectDefinition.tableName) return objectDefinition.tableName; if (objectDefinition.pluralLabel) { return objectDefinition.pluralLabel.toLowerCase().replace(/[^a-z0-9]+/g, '_'); } return `${objectDefinition.apiName.toLowerCase()}s`; } private async getOpenAiConfig(tenantId: string): Promise { const resolvedTenantId = await this.tenantDbService.resolveTenantId(tenantId); const centralPrisma = getCentralPrisma(); const tenant = await centralPrisma.tenant.findUnique({ where: { id: resolvedTenantId }, select: { integrationsConfig: true }, }); let config = tenant?.integrationsConfig ? typeof tenant.integrationsConfig === 'string' ? this.tenantDbService.decryptIntegrationsConfig(tenant.integrationsConfig) : tenant.integrationsConfig : null; if (!config?.openai && process.env.OPENAI_API_KEY) { config = { ...(config || {}), openai: { apiKey: process.env.OPENAI_API_KEY, embeddingModel: this.defaultEmbeddingModel, }, }; } if (config?.openai?.apiKey) { return { apiKey: config.openai.apiKey, embeddingModel: config.openai.embeddingModel || this.defaultEmbeddingModel, }; } return null; } }