import { NestFactory } from '@nestjs/core'; import { FastifyAdapter, NestFastifyApplication, } from '@nestjs/platform-fastify'; import { ValidationPipe, Logger } from '@nestjs/common'; import { AppModule } from './app.module'; import { VoiceService } from './voice/voice.service'; import { AudioConverterService } from './voice/audio-converter.service'; async function bootstrap() { const app = await NestFactory.create( AppModule, new FastifyAdapter({ logger: true }), ); // Global validation pipe app.useGlobalPipes( new ValidationPipe({ transform: true, whitelist: true, forbidNonWhitelisted: true, }), ); // Enable CORS app.enableCors({ origin: true, credentials: true, }); // Global prefix app.setGlobalPrefix('api'); const port = process.env.PORT || 3000; await app.listen(port, '0.0.0.0'); // After app is listening, register WebSocket handler const fastifyInstance = app.getHttpAdapter().getInstance(); const logger = new Logger('MediaStreamWS'); const voiceService = app.get(VoiceService); const audioConverter = app.get(AudioConverterService); const WebSocketServer = require('ws').Server; const wss = new WebSocketServer({ noServer: true }); // Handle WebSocket upgrades at the server level const server = (fastifyInstance.server as any); // Track active Media Streams connections: streamSid -> WebSocket const mediaStreams: Map = new Map(); server.on('upgrade', (request: any, socket: any, head: any) => { if (request.url === '/api/voice/media-stream') { logger.log('=== MEDIA STREAM WEBSOCKET UPGRADE REQUEST ==='); logger.log(`Path: ${request.url}`); wss.handleUpgrade(request, socket, head, (ws: any) => { logger.log('=== MEDIA STREAM WEBSOCKET UPGRADED SUCCESSFULLY ==='); handleMediaStreamSocket(ws); }); } }); async function handleMediaStreamSocket(ws: any) { let streamSid: string | null = null; let callSid: string | null = null; let tenantDomain: string | null = null; let mediaPacketCount = 0; ws.on('message', async (message: Buffer) => { try { const msg = JSON.parse(message.toString()); switch (msg.event) { case 'connected': logger.log('=== MEDIA STREAM EVENT: CONNECTED ==='); logger.log(`Protocol: ${msg.protocol}`); logger.log(`Version: ${msg.version}`); break; case 'start': streamSid = msg.streamSid; callSid = msg.start.callSid; tenantDomain = msg.start.customParameters?.tenantId || 'tenant1'; logger.log(`=== MEDIA STREAM EVENT: START ===`); logger.log(`StreamSid: ${streamSid}`); logger.log(`CallSid: ${callSid}`); logger.log(`Tenant: ${tenantDomain}`); logger.log(`MediaFormat: ${JSON.stringify(msg.start.mediaFormat)}`); mediaStreams.set(streamSid, ws); logger.log(`Stored WebSocket for streamSid: ${streamSid}. Total active streams: ${mediaStreams.size}`); // Initialize OpenAI Realtime connection logger.log(`Initializing OpenAI Realtime for call ${callSid}...`); try { await voiceService.initializeOpenAIRealtime({ callSid, tenantId: tenantDomain, userId: msg.start.customParameters?.userId || 'system', }); logger.log(`✓ OpenAI Realtime initialized for call ${callSid}`); } catch (error: any) { logger.error(`Failed to initialize OpenAI: ${error.message}`); } break; case 'media': mediaPacketCount++; // Only log every 500 packets to reduce noise if (mediaPacketCount % 500 === 0) { logger.log(`Received media packet #${mediaPacketCount} for StreamSid: ${streamSid}`); } if (!callSid || !tenantDomain) { logger.warn('Received media before start event'); break; } try { // Convert Twilio audio (μ-law 8kHz) to OpenAI format (PCM16 24kHz) const twilioAudio = msg.media.payload; const openaiAudio = audioConverter.twilioToOpenAI(twilioAudio); // Send audio to OpenAI Realtime API await voiceService.sendAudioToOpenAI(callSid, openaiAudio); } catch (error: any) { logger.error(`Error processing media: ${error.message}`); } break; case 'stop': logger.log(`=== MEDIA STREAM EVENT: STOP ===`); logger.log(`StreamSid: ${streamSid}`); logger.log(`Total media packets received: ${mediaPacketCount}`); if (streamSid) { mediaStreams.delete(streamSid); logger.log(`Removed WebSocket for streamSid: ${streamSid}`); } // Clean up OpenAI connection if (callSid) { try { logger.log(`Cleaning up OpenAI connection for call ${callSid}...`); await voiceService.cleanupOpenAIConnection(callSid); logger.log(`✓ OpenAI connection cleaned up`); } catch (error: any) { logger.error(`Failed to cleanup OpenAI: ${error.message}`); } } break; default: logger.debug(`Unknown media stream event: ${msg.event}`); } } catch (error: any) { logger.error(`Error processing media stream message: ${error.message}`); } }); ws.on('close', () => { logger.log(`=== MEDIA STREAM WEBSOCKET CLOSED ===`); if (streamSid) { mediaStreams.delete(streamSid); } }); ws.on('error', (error: Error) => { logger.error(`=== MEDIA STREAM WEBSOCKET ERROR ===`); logger.error(`Error message: ${error.message}`); }); } console.log(`🚀 Application is running on: http://localhost:${port}/api`); } bootstrap();