import { Injectable } from '@nestjs/common'; import { Knex } from 'knex'; import { AiProcessesService } from './ai-processes.service'; import { AiProcessesStreamService } from './ai-processes.stream.service'; import { AiAssistantService } from '../ai-assistant/ai-assistant.service'; import { TenantDatabaseService } from '../tenant/tenant-database.service'; import { AiChatMessage, AiChatSession } from '../models/ai-chat.model'; import { DeepAgentOrchestrator } from './deep-agent.orchestrator'; @Injectable() export class AiProcessesOrchestratorService { constructor( private readonly processesService: AiProcessesService, private readonly streamService: AiProcessesStreamService, private readonly tenantDbService: TenantDatabaseService, private readonly aiAssistantService: AiAssistantService, ) {} private async getTenantContext(tenantId: string) { const resolvedTenantId = await this.tenantDbService.resolveTenantId(tenantId); const knex = await this.tenantDbService.getTenantKnexById(resolvedTenantId); return { knex, tenantId: resolvedTenantId }; } private async createSessionWithContext( knex: Knex, tenantId: string, userId: string, ) { return AiChatSession.query(knex).insert({ userId, }); } async createSession(tenantId: string, userId: string) { const { knex, tenantId: resolvedTenantId } = await this.getTenantContext(tenantId); return this.createSessionWithContext(knex, resolvedTenantId, userId); } async sendMessage( tenantId: string, userId: string, message: string, sessionId?: string, processId?: string, history?: { role: string; text: string }[], context?: Record, ) { const { knex, tenantId: resolvedTenantId } = await this.getTenantContext(tenantId); const session = sessionId ? await AiChatSession.query(knex).findById(sessionId) : await this.createSessionWithContext(knex, resolvedTenantId, userId); if (!session) { throw new Error('Chat session not found.'); } await AiChatMessage.query(knex).insert({ sessionId: session.id, role: 'user', content: message, }); this.streamService.emit(session.id, { type: 'agent_started' }); const processes = await this.processesService.listProcesses(resolvedTenantId); this.streamService.emit(session.id, { type: 'processes_listed', data: { count: processes.length }, }); // If no processes configured, fallback to standard AI assistant if (!processes.length) { const response = await this.aiAssistantService.handleChat( resolvedTenantId, userId, message, (history ?? []) as any, context ?? {}, ); this.streamService.emit(session.id, { type: 'final', data: { reply: response.reply, action: response.action }, }); await AiChatMessage.query(knex).insert({ sessionId: session.id, role: 'assistant', content: response.reply, }); return { sessionId: session.id, reply: response.reply, action: response.action, record: response.record, }; } // Get OpenAI credentials from tenant integrations const credentials = await this.aiAssistantService.getOpenAiConfig(resolvedTenantId); if (!credentials?.apiKey) { throw new Error('OpenAI credentials not configured for this tenant'); } // Create Deep Agent with tenant's credentials const deepAgent = new DeepAgentOrchestrator(credentials.apiKey, credentials.model); // Use Deep Agent to select the best process const processInfos = processes.map((p) => ({ id: p.id, name: p.name, description: p.description || undefined, })); const selection = await deepAgent.selectProcess( message, processInfos, history as any, ); // If we need more information or no match, respond with question if (selection.action === 'need_more_info' || selection.action === 'no_match') { const reply = selection.question || selection.reasoning || 'I\'m not sure which process to use. Could you provide more details?'; this.streamService.emit(session.id, { type: 'final', data: { reply, needsMoreInfo: true }, }); await AiChatMessage.query(knex).insert({ sessionId: session.id, role: 'assistant', content: reply, }); return { sessionId: session.id, reply, needsMoreInfo: true }; } // Process selected - find it and execute const selectedProcess = processes.find((p) => p.id === selection.processId); if (!selectedProcess) { throw new Error('Selected process not found.'); } this.streamService.emit(session.id, { type: 'process_selected', processId: selectedProcess.id, version: selectedProcess.latestVersion, data: { processName: selectedProcess.name, reasoning: selection.reasoning }, }); // Extract inputs from the message // For now, we'll use a simple approach - just pass the message as input // In a more sophisticated implementation, we'd use the deep agent to extract structured inputs const startMessage = await deepAgent.generateStartMessage( selectedProcess.name, { message }, ); this.streamService.emit(session.id, { type: 'agent_message', data: { message: startMessage }, }); await AiChatMessage.query(knex).insert({ sessionId: session.id, role: 'assistant', content: startMessage, }); const { run, result } = await this.processesService.createRun( resolvedTenantId, userId, selectedProcess.id, { message, context: context || {} }, session.id, (payload) => this.streamService.emit(session.id, payload), ); // Emit final event this.streamService.emit(session.id, { type: 'final', data: { runId: run.id, status: result.status, output: result.output, message: result.status === 'completed' ? '✅ Workflow completed successfully!' : result.status === 'error' ? `❌ Workflow failed: ${result.error?.message || 'Unknown error'}` : '⏸️ Workflow paused', }, }); await AiChatMessage.query(knex).insert({ sessionId: session.id, role: 'assistant', content: result.status === 'completed' ? '✅ Workflow completed successfully!' : result.status === 'error' ? `❌ Workflow failed: ${result.error?.message || 'Unknown error'}` : '⏸️ Workflow paused', }); return { sessionId: session.id, runId: run.id, status: result.status }; } }