import { Controller, Post, Get, Body, Req, Res, UseGuards, Logger, Query, } from '@nestjs/common'; import { FastifyRequest, FastifyReply } from 'fastify'; import { JwtAuthGuard } from '../auth/jwt-auth.guard'; import { VoiceService } from './voice.service'; import { VoiceGateway } from './voice.gateway'; import { AudioConverterService } from './audio-converter.service'; import { InitiateCallDto } from './dto/initiate-call.dto'; import { TenantId } from '../tenant/tenant.decorator'; @Controller('voice') export class VoiceController { private readonly logger = new Logger(VoiceController.name); // Track active Media Streams connections: streamSid -> WebSocket private mediaStreams: Map = new Map(); constructor( private readonly voiceService: VoiceService, private readonly voiceGateway: VoiceGateway, private readonly audioConverter: AudioConverterService, ) {} /** * Initiate outbound call via REST */ @Post('call') @UseGuards(JwtAuthGuard) async initiateCall( @Body() body: InitiateCallDto, @Req() req: any, @TenantId() tenantId: string, ) { const userId = req.user?.userId || req.user?.sub; const result = await this.voiceService.initiateCall({ tenantId, userId, toNumber: body.toNumber, }); return { success: true, data: result, }; } /** * Generate Twilio access token for browser client */ @Get('token') @UseGuards(JwtAuthGuard) async getAccessToken( @Req() req: any, @TenantId() tenantId: string, ) { const userId = req.user?.userId || req.user?.sub; const token = await this.voiceService.generateAccessToken(tenantId, userId); return { success: true, data: { token }, }; } /** * Get call history */ @Get('calls') @UseGuards(JwtAuthGuard) async getCallHistory( @Req() req: any, @TenantId() tenantId: string, @Query('limit') limit?: string, ) { const userId = req.user?.userId || req.user?.sub; const calls = await this.voiceService.getCallHistory( tenantId, userId, limit ? parseInt(limit) : 50, ); return { success: true, data: calls, }; } /** * TwiML for outbound calls from browser (Twilio Device) */ @Post('twiml/outbound') async outboundTwiml(@Req() req: FastifyRequest, @Res() res: FastifyReply) { const body = req.body as any; const to = body.To; const from = body.From; const callSid = body.CallSid; this.logger.log(`=== TwiML OUTBOUND REQUEST RECEIVED ===`); this.logger.log(`CallSid: ${callSid}, Body From: ${from}, Body To: ${to}`); this.logger.log(`Full body: ${JSON.stringify(body)}`); try { // Extract tenant domain from Host header const host = req.headers.host || ''; const tenantDomain = host.split('.')[0]; // e.g., "tenant1" from "tenant1.routebox.co" this.logger.log(`Extracted tenant domain: ${tenantDomain}`); // Look up tenant's Twilio phone number from config let callerId = to; // Fallback (will cause error if not found) try { // Get Twilio config to find the phone number const { config } = await this.voiceService['getTwilioClient'](tenantDomain); callerId = config.phoneNumber; this.logger.log(`Retrieved Twilio phone number for tenant: ${callerId}`); } catch (error: any) { this.logger.error(`Failed to get Twilio config: ${error.message}`); } const dialNumber = to; this.logger.log(`Using callerId: ${callerId}, dialNumber: ${dialNumber}`); // Return TwiML to DIAL the phone number with proper callerId const twiml = ` ${dialNumber} `; this.logger.log(`Returning TwiML with Dial verb - callerId: ${callerId}, to: ${dialNumber}`); res.type('text/xml').send(twiml); } catch (error: any) { this.logger.error(`=== ERROR GENERATING TWIML ===`); this.logger.error(`Error: ${error.message}`); this.logger.error(`Stack: ${error.stack}`); const errorTwiml = ` An error occurred while processing your call. `; res.type('text/xml').send(errorTwiml); } } /** * TwiML for inbound calls */ @Post('twiml/inbound') async inboundTwiml(@Req() req: FastifyRequest, @Res() res: FastifyReply) { const body = req.body as any; const callSid = body.CallSid; const fromNumber = body.From; const toNumber = body.To; this.logger.log(`\n\n╔════════════════════════════════════════╗`); this.logger.log(`║ === INBOUND CALL RECEIVED ===`); this.logger.log(`╚════════════════════════════════════════╝`); this.logger.log(`CallSid: ${callSid}`); this.logger.log(`From: ${fromNumber}`); this.logger.log(`To: ${toNumber}`); this.logger.log(`Full body: ${JSON.stringify(body)}`); try { // Extract tenant domain from Host header const host = req.headers.host || ''; const tenantDomain = host.split('.')[0]; // e.g., "tenant1" from "tenant1.routebox.co" this.logger.log(`Extracted tenant domain: ${tenantDomain}`); // Get all connected users for this tenant const connectedUsers = this.voiceGateway.getConnectedUsers(tenantDomain); this.logger.log(`Connected users for tenant ${tenantDomain}: ${connectedUsers.length}`); if (connectedUsers.length > 0) { this.logger.log(`Connected user IDs: ${connectedUsers.join(', ')}`); } if (connectedUsers.length === 0) { // No users online - send to voicemail or play message const twiml = ` Sorry, no agents are currently available. Please try again later. `; this.logger.log(`❌ No users online - returning unavailable message`); return res.type('text/xml').send(twiml); } // Build TwiML to dial all connected clients with Media Streams for AI const clientElements = connectedUsers.map(userId => ` ${userId}`).join('\n'); // Use wss:// for secure WebSocket (Traefik handles HTTPS) const streamUrl = `wss://${host}/api/voice/media-stream`; this.logger.log(`Stream URL: ${streamUrl}`); this.logger.log(`Dialing ${connectedUsers.length} client(s)...`); this.logger.log(`Client IDs to dial: ${connectedUsers.join(', ')}`); // Verify we have client IDs in proper format if (connectedUsers.length > 0) { this.logger.log(`First Client ID format check: "${connectedUsers[0]}" (length: ${connectedUsers[0].length})`); } // Notify connected users about incoming call via Socket.IO connectedUsers.forEach(userId => { this.voiceGateway.notifyIncomingCall(userId, { callSid, fromNumber, toNumber, tenantDomain, }); }); const twiml = ` ${clientElements} `; this.logger.log(`✓ Returning inbound TwiML with Media Streams - dialing ${connectedUsers.length} client(s)`); this.logger.log(`Generated TwiML:\n${twiml}\n`); res.type('text/xml').send(twiml); } catch (error: any) { this.logger.error(`Error generating inbound TwiML: ${error.message}`); const errorTwiml = ` Sorry, we are unable to connect your call at this time. `; res.type('text/xml').send(errorTwiml); } } /** * Twilio status webhook */ @Post('webhook/status') async statusWebhook(@Req() req: FastifyRequest) { const body = req.body as any; const callSid = body.CallSid; const status = body.CallStatus; const duration = body.CallDuration ? parseInt(body.CallDuration) : undefined; this.logger.log(`Call status webhook - CallSid: ${callSid}, Status: ${status}, Duration: ${duration}`); this.logger.log(`Full status webhook body:`, JSON.stringify(body)); return { success: true }; } /** * Twilio recording webhook */ @Post('webhook/recording') async recordingWebhook(@Req() req: FastifyRequest) { const body = req.body as any; const callSid = body.CallSid; const recordingSid = body.RecordingSid; const recordingStatus = body.RecordingStatus; this.logger.log(`Recording webhook - CallSid: ${callSid}, RecordingSid: ${recordingSid}, Status: ${recordingStatus}`); return { success: true }; } /** * Twilio Media Streams WebSocket endpoint * Receives real-time audio from Twilio and forwards to OpenAI Realtime API * * This handles the HTTP GET request and upgrades it to WebSocket manually. */ @Get('media-stream') mediaStream(@Req() req: FastifyRequest) { // For WebSocket upgrade, we need to access the raw socket let socket: any; try { this.logger.log(`=== MEDIA STREAM REQUEST ===`); this.logger.log(`URL: ${req.url}`); this.logger.log(`Headers keys: ${Object.keys(req.headers).join(', ')}`); this.logger.log(`Headers: ${JSON.stringify(req.headers)}`); // Check if this is a WebSocket upgrade request const hasWebSocketKey = 'sec-websocket-key' in req.headers; const hasWebSocketVersion = 'sec-websocket-version' in req.headers; this.logger.log(`hasWebSocketKey: ${hasWebSocketKey}`); this.logger.log(`hasWebSocketVersion: ${hasWebSocketVersion}`); if (!hasWebSocketKey || !hasWebSocketVersion) { this.logger.log('Not a WebSocket upgrade request - returning'); return; } this.logger.log('✓ WebSocket upgrade detected'); // Get the socket - try different ways socket = (req.raw as any).socket; this.logger.log(`Socket obtained: ${!!socket}`); if (!socket) { this.logger.error('Failed to get socket from req.raw'); return; } const rawRequest = req.raw; const head = Buffer.alloc(0); this.logger.log('Creating WebSocketServer...'); const WebSocketServer = require('ws').Server; const wss = new WebSocketServer({ noServer: true }); this.logger.log('Calling handleUpgrade...'); // handleUpgrade will send the 101 response and take over the socket wss.handleUpgrade(rawRequest, socket, head, (ws: any) => { this.logger.log('=== TWILIO MEDIA STREAM WEBSOCKET UPGRADED SUCCESSFULLY ==='); this.handleMediaStreamSocket(ws); }); this.logger.log('handleUpgrade completed'); } catch (error: any) { this.logger.error(`=== FAILED TO UPGRADE TO WEBSOCKET ===`); this.logger.error(`Error message: ${error.message}`); this.logger.error(`Error stack: ${error.stack}`); } } /** * Handle incoming Media Stream WebSocket messages */ private handleMediaStreamSocket(ws: any) { let streamSid: string | null = null; let callSid: string | null = null; let tenantDomain: string | null = null; let mediaPacketCount = 0; // WebSocket message handler ws.on('message', async (message: Buffer) => { try { const msg = JSON.parse(message.toString()); switch (msg.event) { case 'connected': this.logger.log('=== MEDIA STREAM EVENT: CONNECTED ==='); this.logger.log(`Protocol: ${msg.protocol}`); this.logger.log(`Version: ${msg.version}`); break; case 'start': streamSid = msg.streamSid; callSid = msg.start.callSid; // Extract tenant from customParameters if available tenantDomain = msg.start.customParameters?.tenantId || 'tenant1'; this.logger.log(`=== MEDIA STREAM EVENT: START ===`); this.logger.log(`StreamSid: ${streamSid}`); this.logger.log(`CallSid: ${callSid}`); this.logger.log(`Tenant: ${tenantDomain}`); this.logger.log(`AccountSid: ${msg.start.accountSid}`); this.logger.log(`MediaFormat: ${JSON.stringify(msg.start.mediaFormat)}`); this.logger.log(`Custom Parameters: ${JSON.stringify(msg.start.customParameters)}`); // Store WebSocket connection this.mediaStreams.set(streamSid, ws); this.logger.log(`Stored WebSocket for streamSid: ${streamSid}. Total active streams: ${this.mediaStreams.size}`); // Initialize OpenAI Realtime connection for this call this.logger.log(`Initializing OpenAI Realtime for call ${callSid}...`); await this.voiceService.initializeOpenAIRealtime({ callSid, tenantId: tenantDomain, userId: msg.start.customParameters?.userId || 'system', }); this.logger.log(`✓ OpenAI Realtime initialized for call ${callSid}`); break; case 'media': mediaPacketCount++; if (mediaPacketCount % 50 === 0) { // Log every 50th packet to avoid spam this.logger.log(`Received media packet #${mediaPacketCount} for StreamSid: ${streamSid}, CallSid: ${callSid}, PayloadSize: ${msg.media.payload?.length || 0} bytes`); } if (!callSid || !tenantDomain) { this.logger.warn('Received media before start event'); break; } // msg.media.payload is base64-encoded μ-law audio from Twilio const twilioAudio = msg.media.payload; // Convert Twilio audio (μ-law 8kHz) to OpenAI format (PCM16 24kHz) const openaiAudio = this.audioConverter.twilioToOpenAI(twilioAudio); // Send audio to OpenAI Realtime API await this.voiceService.sendAudioToOpenAI(callSid, openaiAudio); break; case 'stop': this.logger.log(`=== MEDIA STREAM EVENT: STOP ===`); this.logger.log(`StreamSid: ${streamSid}`); this.logger.log(`Total media packets received: ${mediaPacketCount}`); if (streamSid) { this.mediaStreams.delete(streamSid); this.logger.log(`Removed WebSocket for streamSid: ${streamSid}. Remaining active streams: ${this.mediaStreams.size}`); } // Clean up OpenAI connection if (callSid) { this.logger.log(`Cleaning up OpenAI connection for call ${callSid}...`); await this.voiceService.cleanupOpenAIConnection(callSid); this.logger.log(`✓ OpenAI connection cleaned up for call ${callSid}`); } break; default: this.logger.debug(`Unknown media stream event: ${msg.event}`); } } catch (error: any) { this.logger.error(`Error processing media stream message: ${error.message}`); this.logger.error(`Stack: ${error.stack}`); } }); ws.on('close', () => { this.logger.log(`=== MEDIA STREAM WEBSOCKET CLOSED ===`); this.logger.log(`StreamSid: ${streamSid}`); this.logger.log(`Total media packets in this stream: ${mediaPacketCount}`); if (streamSid) { this.mediaStreams.delete(streamSid); this.logger.log(`Cleaned up streamSid on close. Remaining active streams: ${this.mediaStreams.size}`); } }); ws.on('error', (error: Error) => { this.logger.error(`=== MEDIA STREAM WEBSOCKET ERROR ===`); this.logger.error(`StreamSid: ${streamSid}`); this.logger.error(`Error message: ${error.message}`); this.logger.error(`Error stack: ${error.stack}`); }); } /** * Send audio from OpenAI back to Twilio Media Stream */ async sendAudioToTwilio(streamSid: string, openaiAudioBase64: string) { const ws = this.mediaStreams.get(streamSid); if (!ws) { this.logger.warn(`No Media Stream found for streamSid: ${streamSid}`); return; } try { // Convert OpenAI audio (PCM16 24kHz) to Twilio format (μ-law 8kHz) const twilioAudio = this.audioConverter.openAIToTwilio(openaiAudioBase64); // Send to Twilio Media Stream const message = { event: 'media', streamSid, media: { payload: twilioAudio, }, }; ws.send(JSON.stringify(message)); } catch (error: any) { this.logger.error(`Error sending audio to Twilio: ${error.message}`); } } }