import { Injectable, Logger } from '@nestjs/common'; import { TenantDatabaseService } from '../tenant/tenant-database.service'; import { getCentralPrisma } from '../prisma/central-prisma.service'; import { IntegrationsConfig, TwilioConfig, OpenAIConfig } from './interfaces/integration-config.interface'; import * as Twilio from 'twilio'; import { WebSocket } from 'ws'; import { v4 as uuidv4 } from 'uuid'; const AccessToken = Twilio.jwt.AccessToken; const VoiceGrant = AccessToken.VoiceGrant; @Injectable() export class VoiceService { private readonly logger = new Logger(VoiceService.name); private twilioClients: Map = new Map(); private openaiConnections: Map = new Map(); // callSid -> WebSocket private callStates: Map = new Map(); // callSid -> call state private voiceGateway: any; // Reference to gateway (to avoid circular dependency) constructor( private readonly tenantDbService: TenantDatabaseService, ) {} /** * Set gateway reference (called by gateway on init) */ setGateway(gateway: any) { this.voiceGateway = gateway; } /** * Get Twilio client for a tenant */ private async getTwilioClient(tenantIdOrDomain: string): Promise<{ client: Twilio.Twilio; config: TwilioConfig; tenantId: string }> { // Check cache first if (this.twilioClients.has(tenantIdOrDomain)) { const centralPrisma = getCentralPrisma(); // Look up tenant by domain const domainRecord = await centralPrisma.domain.findUnique({ where: { domain: tenantIdOrDomain }, include: { tenant: { select: { id: true, integrationsConfig: true } } }, }); const config = this.getIntegrationConfig(domainRecord?.tenant?.integrationsConfig as any); return { client: this.twilioClients.get(tenantIdOrDomain), config: config.twilio, tenantId: domainRecord.tenant.id }; } // Fetch tenant integrations config const centralPrisma = getCentralPrisma(); this.logger.log(`Looking up domain: ${tenantIdOrDomain}`); const domainRecord = await centralPrisma.domain.findUnique({ where: { domain: tenantIdOrDomain }, include: { tenant: { select: { id: true, integrationsConfig: true } } }, }); this.logger.log(`Domain record found: ${!!domainRecord}, Tenant: ${!!domainRecord?.tenant}, Config: ${!!domainRecord?.tenant?.integrationsConfig}`); if (!domainRecord?.tenant) { throw new Error(`Domain ${tenantIdOrDomain} not found`); } if (!domainRecord.tenant.integrationsConfig) { throw new Error('Tenant integrations config not found. Please configure Twilio credentials in Settings > Integrations'); } const config = this.getIntegrationConfig(domainRecord.tenant.integrationsConfig as any); this.logger.log(`Config decrypted: ${!!config.twilio}, AccountSid: ${config.twilio?.accountSid?.substring(0, 10)}..., AuthToken: ${config.twilio?.authToken?.substring(0, 10)}..., Phone: ${config.twilio?.phoneNumber}`); if (!config.twilio?.accountSid || !config.twilio?.authToken) { throw new Error('Twilio credentials not configured for tenant'); } const client = Twilio.default(config.twilio.accountSid, config.twilio.authToken); this.twilioClients.set(tenantIdOrDomain, client); return { client, config: config.twilio, tenantId: domainRecord.tenant.id }; } /** * Decrypt and parse integrations config */ private getIntegrationConfig(encryptedConfig: any): IntegrationsConfig { if (!encryptedConfig) { return {}; } // If it's already decrypted (object), return it if (typeof encryptedConfig === 'object' && encryptedConfig.twilio) { return encryptedConfig; } // If it's encrypted (string), decrypt it if (typeof encryptedConfig === 'string') { return this.tenantDbService.decryptIntegrationsConfig(encryptedConfig); } return {}; } /** * Generate Twilio access token for browser Voice SDK */ async generateAccessToken(tenantDomain: string, userId: string): Promise { const { config, tenantId } = await this.getTwilioClient(tenantDomain); if (!config.accountSid || !config.apiKey || !config.apiSecret) { throw new Error('Twilio API credentials not configured. Please add API Key and Secret in Settings > Integrations'); } // Create an access token const token = new AccessToken( config.accountSid, config.apiKey, config.apiSecret, { identity: userId, ttl: 3600 } // 1 hour expiry ); // Create a Voice grant const voiceGrant = new VoiceGrant({ outgoingApplicationSid: config.twimlAppSid, // TwiML App SID for outbound calls incomingAllow: true, // Allow incoming calls }); token.addGrant(voiceGrant); return token.toJwt(); } /** * Initiate outbound call */ async initiateCall(params: { tenantId: string; userId: string; toNumber: string; }) { const { tenantId: tenantDomain, userId, toNumber } = params; try { this.logger.log(`=== INITIATING CALL ===`); this.logger.log(`Domain: ${tenantDomain}, To: ${toNumber}, User: ${userId}`); // Validate phone number if (!toNumber.match(/^\+?[1-9]\d{1,14}$/)) { throw new Error(`Invalid phone number format: ${toNumber}. Use E.164 format (e.g., +1234567890)`); } const { client, config, tenantId } = await this.getTwilioClient(tenantDomain); this.logger.log(`Twilio client obtained for tenant: ${tenantId}`); // Get from number const fromNumber = config.phoneNumber; if (!fromNumber) { throw new Error('Twilio phone number not configured'); } this.logger.log(`From number: ${fromNumber}`); // Construct tenant-specific webhook URLs using HTTPS (for Traefik) const backendUrl = `https://${tenantDomain}`; const twimlUrl = `${backendUrl}/api/voice/twiml/outbound?phoneNumber=${encodeURIComponent(fromNumber)}&toNumber=${encodeURIComponent(toNumber)}`; const statusUrl = `${backendUrl}/api/voice/webhook/status`; this.logger.log(`TwiML URL: ${twimlUrl}`); this.logger.log(`Status URL: ${statusUrl}`); // Create call record in database const tenantKnex = await this.tenantDbService.getTenantKnexById(tenantId); const callId = uuidv4(); // Initiate call via Twilio this.logger.log(`Calling Twilio API...`); // For Device-to-Number calls, we need to use a TwiML App SID // The Twilio SDK will handle the Device connection, and we return TwiML with Dial const call = await client.calls.create({ to: toNumber, from: fromNumber, // Your Twilio phone number url: twimlUrl, statusCallback: statusUrl, statusCallbackEvent: ['initiated', 'ringing', 'answered', 'completed'], statusCallbackMethod: 'POST', record: false, machineDetection: 'Enable', // Optional: detect answering machines }); this.logger.log(`Call created successfully: ${call.sid}, Status: ${call.status}`); // Store call in database await tenantKnex('calls').insert({ id: callId, call_sid: call.sid, direction: 'outbound', from_number: fromNumber, to_number: toNumber, status: 'queued', user_id: userId, created_at: tenantKnex.fn.now(), updated_at: tenantKnex.fn.now(), }); // Store call state in memory this.callStates.set(call.sid, { callId, callSid: call.sid, tenantId, userId, direction: 'outbound', status: 'queued', }); this.logger.log(`Outbound call initiated: ${call.sid}`); return { callId, callSid: call.sid, status: 'queued', }; } catch (error) { this.logger.error('Failed to initiate call', error); throw error; } } /** * Accept incoming call */ async acceptCall(params: { callSid: string; tenantId: string; userId: string; }) { const { callSid, tenantId, userId } = params; try { // Note: Twilio doesn't support updating call to 'in-progress' via API // Call status is managed by TwiML and call flow // We'll update our database status instead // Update database const tenantKnex = await this.tenantDbService.getTenantKnexById(tenantId); await tenantKnex('calls') .where({ call_sid: callSid }) .update({ status: 'in-progress', user_id: userId, started_at: tenantKnex.fn.now(), updated_at: tenantKnex.fn.now(), }); // Update state const state = this.callStates.get(callSid) || {}; this.callStates.set(callSid, { ...state, status: 'in-progress', userId, }); this.logger.log(`Call accepted: ${callSid} by user ${userId}`); } catch (error) { this.logger.error('Failed to accept call', error); throw error; } } /** * Reject incoming call */ async rejectCall(callSid: string, tenantId: string) { try { const { client } = await this.getTwilioClient(tenantId); // End the call await client.calls(callSid).update({ status: 'completed', }); // Update database const tenantKnex = await this.tenantDbService.getTenantKnexById(tenantId); await tenantKnex('calls') .where({ call_sid: callSid }) .update({ status: 'canceled', updated_at: tenantKnex.fn.now(), }); // Clean up state this.callStates.delete(callSid); this.logger.log(`Call rejected: ${callSid}`); } catch (error) { this.logger.error('Failed to reject call', error); throw error; } } /** * End active call */ async endCall(callSid: string, tenantId: string) { try { const { client } = await this.getTwilioClient(tenantId); // End the call await client.calls(callSid).update({ status: 'completed', }); // Clean up OpenAI connection if exists const openaiWs = this.openaiConnections.get(callSid); if (openaiWs) { openaiWs.close(); this.openaiConnections.delete(callSid); } // Update database const tenantKnex = await this.tenantDbService.getTenantKnexById(tenantId); await tenantKnex('calls') .where({ call_sid: callSid }) .update({ status: 'completed', ended_at: tenantKnex.fn.now(), updated_at: tenantKnex.fn.now(), }); // Clean up state this.callStates.delete(callSid); this.logger.log(`Call ended: ${callSid}`); } catch (error) { this.logger.error('Failed to end call', error); throw error; } } /** * Send DTMF tones */ async sendDtmf(callSid: string, digit: string, tenantId: string) { try { const { client } = await this.getTwilioClient(tenantId); // Twilio doesn't support sending DTMF directly via API // This would need to be handled via TwiML of DTMF tones this.logger.log(`DTMF requested for call ${callSid}: ${digit}`); // TODO: Implement DTMF sending via TwiML update } catch (error) { this.logger.error('Failed to send DTMF', error); throw error; } } /** * Get call state */ async getCallState(callSid: string, tenantId: string) { // Try memory first if (this.callStates.has(callSid)) { return this.callStates.get(callSid); } // Fallback to database const tenantKnex = await this.tenantDbService.getTenantKnexById(tenantId); const call = await tenantKnex('calls') .where({ call_sid: callSid }) .first(); return call || null; } /** * Update call status from webhook */ async updateCallStatus(params: { callSid: string; tenantId: string; status: string; duration?: number; recordingUrl?: string; }) { const { callSid, tenantId, status, duration, recordingUrl } = params; try { const tenantKnex = await this.tenantDbService.getTenantKnexById(tenantId); const updateData: any = { status, updated_at: tenantKnex.fn.now(), }; if (duration !== undefined) { updateData.duration_seconds = duration; } if (recordingUrl) { updateData.recording_url = recordingUrl; } if (status === 'completed') { updateData.ended_at = tenantKnex.fn.now(); } await tenantKnex('calls') .where({ call_sid: callSid }) .update(updateData); // Update state const state = this.callStates.get(callSid); if (state) { this.callStates.set(callSid, { ...state, status }); } this.logger.log(`Call status updated: ${callSid} -> ${status}`); } catch (error) { this.logger.error('Failed to update call status', error); throw error; } } /** * Initialize OpenAI Realtime connection for call */ async initializeOpenAIRealtime(params: { callSid: string; tenantId: string; userId: string; }) { const { callSid, tenantId, userId } = params; try { // Get OpenAI config - tenantId might be a domain, so look it up const centralPrisma = getCentralPrisma(); // Try to find tenant by domain first (if tenantId is like "tenant1") let tenant; if (!tenantId.match(/^[0-9a-f]{8}-[0-9a-f]{4}-/i)) { // Looks like a domain, not a UUID const domainRecord = await centralPrisma.domain.findUnique({ where: { domain: tenantId }, include: { tenant: { select: { id: true, integrationsConfig: true } } }, }); tenant = domainRecord?.tenant; } else { // It's a UUID tenant = await centralPrisma.tenant.findUnique({ where: { id: tenantId }, select: { id: true, integrationsConfig: true }, }); } if (!tenant) { this.logger.warn(`Tenant not found for identifier: ${tenantId}`); return; } const config = this.getIntegrationConfig(tenant?.integrationsConfig as any); if (!config.openai?.apiKey) { this.logger.warn('OpenAI not configured for tenant, skipping AI features'); return; } // Connect to OpenAI Realtime API const model = config.openai.model || 'gpt-4o-realtime-preview-2024-10-01'; const ws = new WebSocket(`wss://api.openai.com/v1/realtime?model=${model}`, { headers: { 'Authorization': `Bearer ${config.openai.apiKey}`, 'OpenAI-Beta': 'realtime=v1', }, }); ws.on('open', () => { this.logger.log(`OpenAI Realtime connected for call ${callSid}`); // Add to connections map only after it's open this.openaiConnections.set(callSid, ws); // Store call state with userId for later use this.callStates.set(callSid, { callSid, tenantId: tenant.id, userId, status: 'in-progress', }); this.logger.log(`📝 Stored call state for ${callSid} with userId: ${userId}`); // Initialize session ws.send(JSON.stringify({ type: 'session.update', session: { model: config.openai.model || 'gpt-4o-realtime-preview', voice: config.openai.voice || 'alloy', instructions: `You are an AI assistant in LISTENING MODE, helping a sales/support agent during their phone call. IMPORTANT: You are NOT talking to the caller. You are advising the agent who is handling the call. Your role: - Listen to the conversation between the agent and the caller - Provide concise, actionable suggestions to help the agent - Recommend CRM actions (search contacts, create tasks, update records) - Alert the agent to important information or next steps - Keep suggestions brief (1-2 sentences max) Format your suggestions like: "💡 Suggestion: [your advice]" "⚠️ Alert: [important notice]" "📋 Action: [recommended CRM action]"`, turn_detection: { type: 'server_vad', }, tools: this.getOpenAITools(), }, })); }); ws.on('message', (data: Buffer) => { // Pass the tenant UUID (tenant.id) instead of the domain string this.handleOpenAIMessage(callSid, tenant.id, userId, JSON.parse(data.toString())); }); ws.on('error', (error) => { this.logger.error(`OpenAI WebSocket error for call ${callSid}:`, error); this.openaiConnections.delete(callSid); }); ws.on('close', (code, reason) => { this.logger.log(`OpenAI Realtime disconnected for call ${callSid} - Code: ${code}, Reason: ${reason.toString()}`); this.openaiConnections.delete(callSid); }); // Don't add to connections here - wait for 'open' event } catch (error) { this.logger.error('Failed to initialize OpenAI Realtime', error); } } /** * Send audio data to OpenAI Realtime API */ async sendAudioToOpenAI(callSid: string, audioBase64: string) { const ws = this.openaiConnections.get(callSid); if (!ws) { this.logger.warn(`No OpenAI connection for call ${callSid}`); return; } try { // Send audio chunk to OpenAI ws.send(JSON.stringify({ type: 'input_audio_buffer.append', audio: audioBase64, })); } catch (error) { this.logger.error(`Failed to send audio to OpenAI for call ${callSid}`, error); } } /** * Commit audio buffer to OpenAI (trigger processing) */ async commitAudioBuffer(callSid: string) { const ws = this.openaiConnections.get(callSid); if (!ws) { return; } try { ws.send(JSON.stringify({ type: 'input_audio_buffer.commit', })); } catch (error) { this.logger.error(`Failed to commit audio buffer for call ${callSid}`, error); } } /** * Clean up OpenAI connection for a call */ async cleanupOpenAIConnection(callSid: string) { const ws = this.openaiConnections.get(callSid); if (ws) { try { ws.close(); this.openaiConnections.delete(callSid); this.logger.log(`Cleaned up OpenAI connection for call ${callSid}`); } catch (error) { this.logger.error(`Error cleaning up OpenAI connection for call ${callSid}`, error); } } } /** * Handle OpenAI Realtime messages */ private async handleOpenAIMessage( callSid: string, tenantId: string, userId: string, message: any, ) { try { switch (message.type) { case 'conversation.item.created': // Skip logging for now break; case 'response.audio.delta': // OpenAI is sending audio response (skip logging) const state = this.callStates.get(callSid); if (state?.streamSid && message.delta) { if (!state.pendingAudio) { state.pendingAudio = []; } state.pendingAudio.push(message.delta); } break; case 'response.audio.done': // Skip logging break; case 'response.audio_transcript.delta': // Skip - not transmitting individual words to frontend break; case 'response.audio_transcript.done': // Final transcript - this contains the AI's actual text suggestions! const transcript = message.transcript; this.logger.log(`💡 AI Suggestion: "${transcript}"`); // Save to database await this.updateCallTranscript(callSid, tenantId, transcript); // Also send as suggestion to frontend if it looks like a suggestion if (transcript && transcript.length > 0) { // Determine suggestion type let suggestionType: 'response' | 'action' | 'insight' = 'insight'; if (transcript.includes('💡') || transcript.toLowerCase().includes('suggest')) { suggestionType = 'response'; } else if (transcript.includes('📋') || transcript.toLowerCase().includes('action')) { suggestionType = 'action'; } else if (transcript.includes('⚠️') || transcript.toLowerCase().includes('alert')) { suggestionType = 'insight'; } // Emit to frontend const state = this.callStates.get(callSid); this.logger.log(`📊 Call state - userId: ${state?.userId}, gateway: ${!!this.voiceGateway}`); if (state?.userId && this.voiceGateway) { this.logger.log(`📤 Sending to user ${state.userId}`); await this.voiceGateway.notifyAiSuggestion(state.userId, { type: suggestionType, text: transcript, callSid, timestamp: new Date().toISOString(), }); this.logger.log(`✅ Suggestion sent to agent`); } else { this.logger.warn(`❌ Cannot send - userId: ${state?.userId}, gateway: ${!!this.voiceGateway}, callStates has ${this.callStates.size} entries`); } } break; case 'response.function_call_arguments.done': // Tool call completed await this.handleToolCall(callSid, tenantId, userId, message); break; case 'session.created': case 'session.updated': case 'response.created': case 'response.output_item.added': case 'response.content_part.added': case 'response.content_part.done': case 'response.output_item.done': case 'response.done': case 'input_audio_buffer.speech_started': case 'input_audio_buffer.speech_stopped': case 'input_audio_buffer.committed': // Skip logging for these (too noisy) break; case 'error': this.logger.error(`OpenAI error for call ${callSid}: ${JSON.stringify(message.error)}`); break; default: // Only log unhandled types occasionally break; } } catch (error) { this.logger.error('Failed to handle OpenAI message', error); } } /** * Define OpenAI tools for CRM actions */ private getOpenAITools(): any[] { return [ { type: 'function', name: 'search_contact', description: 'Search for a contact by name, email, or phone number', parameters: { type: 'object', properties: { query: { type: 'string', description: 'Search query (name, email, or phone)', }, }, required: ['query'], }, }, { type: 'function', name: 'create_task', description: 'Create a follow-up task based on the call', parameters: { type: 'object', properties: { title: { type: 'string', description: 'Task title', }, description: { type: 'string', description: 'Task description', }, dueDate: { type: 'string', description: 'Due date (ISO format)', }, }, required: ['title'], }, }, { type: 'function', name: 'update_contact', description: 'Update contact information', parameters: { type: 'object', properties: { contactId: { type: 'string', description: 'Contact ID', }, fields: { type: 'object', description: 'Fields to update', }, }, required: ['contactId', 'fields'], }, }, ]; } /** * Handle tool calls from OpenAI */ private async handleToolCall( callSid: string, tenantId: string, userId: string, message: any, ) { // TODO: Implement actual tool execution // This would call the appropriate services based on the tool name // Respecting RBAC permissions for the user this.logger.log(`Tool call for call ${callSid}: ${message.name}`); } /** * Update call transcript */ private async updateCallTranscript( callSid: string, tenantId: string, transcript: string, ) { try { const tenantKnex = await this.tenantDbService.getTenantKnexById(tenantId); await tenantKnex('calls') .where({ call_sid: callSid }) .update({ ai_transcript: transcript, updated_at: tenantKnex.fn.now(), }); } catch (error) { this.logger.error('Failed to update transcript', error); } } /** * Get call history for user */ async getCallHistory(tenantId: string, userId: string, limit = 50) { try { const tenantKnex = await this.tenantDbService.getTenantKnexById(tenantId); const calls = await tenantKnex('calls') .where({ user_id: userId }) .orderBy('created_at', 'desc') .limit(limit); return calls; } catch (error) { this.logger.error('Failed to get call history', error); throw error; } } }