Add phased knowledge layer: comments and semantic linking pipeline

This commit is contained in:
phyroslam
2026-04-11 11:40:01 -07:00
parent baf3997fb6
commit df183230d8
16 changed files with 1020 additions and 1 deletions

View File

@@ -0,0 +1,225 @@
import { Injectable, Logger } from '@nestjs/common';
import { TenantDatabaseService } from '../../tenant/tenant-database.service';
import { MeilisearchService } from '../../search/meilisearch.service';
import {
DefaultSemanticProjectionAdapter,
SemanticProjectionAdapter,
} from '../adapters/semantic-projection.adapter';
import { SemanticChunkerService } from './semantic-chunker.service';
import { SemanticLinkService } from './semantic-link.service';
@Injectable()
export class SemanticOrchestratorService {
private readonly logger = new Logger(SemanticOrchestratorService.name);
private readonly adapters: SemanticProjectionAdapter[] = [new DefaultSemanticProjectionAdapter()];
constructor(
private readonly tenantDbService: TenantDatabaseService,
private readonly meilisearchService: MeilisearchService,
private readonly chunkerService: SemanticChunkerService,
private readonly semanticLinkService: SemanticLinkService,
) {}
async refreshRecord(
tenantId: string,
objectApiName: string,
recordId: string,
userId?: string,
trigger: string = 'manual',
) {
const resolvedTenantId = await this.tenantDbService.resolveTenantId(tenantId);
const knex = await this.tenantDbService.getTenantKnexById(resolvedTenantId);
const objectDefinition = await knex('object_definitions').where({ apiName: objectApiName }).first();
if (!objectDefinition) {
this.logger.warn(`Object definition ${objectApiName} not found. Skipping semantic refresh.`);
return { skipped: true };
}
const tableName = this.getTableName(objectDefinition);
const record = await knex(tableName).where({ id: recordId }).first();
if (!record) {
return { skipped: true };
}
const comments = await knex('comments')
.where({
parent_object_api_name: objectApiName,
parent_record_id: recordId,
})
.orderBy('created_at', 'asc');
const adapter = this.adapters.find((candidate) => candidate.supports(objectApiName))!;
const projection = adapter.buildProjection({
objectApiName,
record,
objectDefinition,
comments,
});
const documentId = await this.upsertSemanticDocument(knex, projection);
const chunks = this.chunkerService.chunkText(projection.narrative, comments);
await this.replaceChunks(knex, documentId, chunks);
await this.indexChunks(resolvedTenantId, projection, chunks);
await this.generateSuggestions(resolvedTenantId, projection, chunks, userId, trigger);
return { documentId, chunkCount: chunks.length };
}
async reindexObject(tenantId: string, objectApiName: string, userId?: string, limit = 250) {
const resolvedTenantId = await this.tenantDbService.resolveTenantId(tenantId);
const knex = await this.tenantDbService.getTenantKnexById(resolvedTenantId);
const objectDefinition = await knex('object_definitions').where({ apiName: objectApiName }).first();
if (!objectDefinition) {
return { total: 0, processed: 0 };
}
const tableName = this.getTableName(objectDefinition);
const records = await knex(tableName).select('id').limit(limit);
let processed = 0;
for (const record of records) {
await this.refreshRecord(resolvedTenantId, objectApiName, record.id, userId, 'batch_reindex');
processed += 1;
}
return { total: records.length, processed };
}
private async upsertSemanticDocument(knex: any, projection: any): Promise<string> {
const existing = await knex('semantic_documents')
.where({ entity_type: projection.entityType, entity_id: projection.entityId })
.first();
if (existing) {
await knex('semantic_documents')
.where({ id: existing.id })
.update({
title: projection.title,
narrative: projection.narrative,
metadata: JSON.stringify(projection.metadata || {}),
source_summary: JSON.stringify(projection.sourceSummary || {}),
updated_at: knex.fn.now(),
});
return existing.id;
}
const [created] = await knex('semantic_documents')
.insert({
entity_type: projection.entityType,
entity_id: projection.entityId,
title: projection.title,
narrative: projection.narrative,
metadata: JSON.stringify(projection.metadata || {}),
source_summary: JSON.stringify(projection.sourceSummary || {}),
created_at: knex.fn.now(),
updated_at: knex.fn.now(),
})
.returning('id');
return typeof created === 'string' ? created : created.id;
}
private async replaceChunks(knex: any, documentId: string, chunks: any[]) {
await knex('semantic_chunks').where({ semantic_document_id: documentId }).delete();
if (!chunks.length) return;
await knex('semantic_chunks').insert(
chunks.map((chunk) => ({
semantic_document_id: documentId,
chunk_index: chunk.chunkIndex,
source_kind: chunk.sourceKind,
source_ref_id: chunk.sourceRefId,
text: chunk.text,
metadata: JSON.stringify(chunk.metadata || {}),
created_at: knex.fn.now(),
updated_at: knex.fn.now(),
})),
);
}
private async indexChunks(tenantId: string, projection: any, chunks: any[]) {
if (!this.meilisearchService.isEnabled()) {
return;
}
const indexName = this.meilisearchService.buildSemanticChunkIndexName(tenantId);
await this.meilisearchService.upsertDocuments(indexName, chunks.map((chunk) => ({
id: `${projection.entityType}:${projection.entityId}:${chunk.chunkIndex}`,
entityType: projection.entityType,
entityId: projection.entityId,
title: projection.title,
sourceKind: chunk.sourceKind,
sourceRefId: chunk.sourceRefId,
text: chunk.text,
})));
}
private async generateSuggestions(
tenantId: string,
projection: any,
chunks: any[],
userId?: string,
trigger: string = 'semantic_refresh',
) {
if (!this.meilisearchService.isEnabled() || !chunks.length) {
return;
}
const indexName = this.meilisearchService.buildSemanticChunkIndexName(tenantId);
const queryText = chunks.slice(0, 3).map((chunk) => chunk.text).join(' ').slice(0, 1200);
const search = await this.meilisearchService.searchIndex(indexName, queryText, 20);
const grouped = new Map<string, any[]>();
for (const hit of search.hits || []) {
if (hit.entityType === projection.entityType && hit.entityId === projection.entityId) {
continue;
}
const key = `${hit.entityType}:${hit.entityId}`;
if (!grouped.has(key)) grouped.set(key, []);
grouped.get(key).push(hit);
}
const resolvedTenantId = await this.tenantDbService.resolveTenantId(tenantId);
const knex = await this.tenantDbService.getTenantKnexById(resolvedTenantId);
for (const [key, hits] of grouped.entries()) {
const [targetType, targetId] = key.split(':');
const confidence = Math.min(0.99, 0.3 + hits.length * 0.1);
await this.semanticLinkService.upsertSuggestedLink(knex, {
sourceEntityType: projection.entityType,
sourceEntityId: projection.entityId,
targetEntityType: targetType,
targetEntityId: targetId,
linkType: 'related_to',
status: 'suggested',
origin: 'semantic',
confidence,
reason: `Suggested from semantic similarity (${trigger})`,
evidence: {
trigger,
sourceSignals: chunks.slice(0, 2).map((chunk) => ({
sourceKind: chunk.sourceKind,
text: chunk.text.slice(0, 180),
})),
matchedChunks: hits.slice(0, 3).map((hit) => ({
sourceKind: hit.sourceKind,
text: String(hit.text || '').slice(0, 180),
})),
},
suggestedByUserId: userId || null,
});
}
}
private getTableName(objectDefinition: any): string {
if (objectDefinition.tableName) return objectDefinition.tableName;
if (objectDefinition.pluralLabel) {
return objectDefinition.pluralLabel.toLowerCase().replace(/[^a-z0-9]+/g, '_');
}
return `${objectDefinition.apiName.toLowerCase()}s`;
}
}