181 lines
6.0 KiB
TypeScript
181 lines
6.0 KiB
TypeScript
import { NestFactory } from '@nestjs/core';
|
|
import {
|
|
FastifyAdapter,
|
|
NestFastifyApplication,
|
|
} from '@nestjs/platform-fastify';
|
|
import { ValidationPipe, Logger } from '@nestjs/common';
|
|
import { AppModule } from './app.module';
|
|
import { VoiceService } from './voice/voice.service';
|
|
import { AudioConverterService } from './voice/audio-converter.service';
|
|
|
|
async function bootstrap() {
|
|
const app = await NestFactory.create<NestFastifyApplication>(
|
|
AppModule,
|
|
new FastifyAdapter({ logger: true }),
|
|
);
|
|
|
|
// Global validation pipe
|
|
app.useGlobalPipes(
|
|
new ValidationPipe({
|
|
transform: true,
|
|
whitelist: true,
|
|
forbidNonWhitelisted: true,
|
|
}),
|
|
);
|
|
|
|
// Enable CORS
|
|
app.enableCors({
|
|
origin: true,
|
|
credentials: true,
|
|
});
|
|
|
|
// Global prefix
|
|
app.setGlobalPrefix('api');
|
|
|
|
const port = process.env.PORT || 3000;
|
|
await app.listen(port, '0.0.0.0');
|
|
|
|
// After app is listening, register WebSocket handler
|
|
const fastifyInstance = app.getHttpAdapter().getInstance();
|
|
const logger = new Logger('MediaStreamWS');
|
|
const voiceService = app.get(VoiceService);
|
|
const audioConverter = app.get(AudioConverterService);
|
|
|
|
const WebSocketServer = require('ws').Server;
|
|
const wss = new WebSocketServer({ noServer: true });
|
|
|
|
// Handle WebSocket upgrades at the server level
|
|
const server = (fastifyInstance.server as any);
|
|
|
|
// Track active Media Streams connections: streamSid -> WebSocket
|
|
const mediaStreams: Map<string, any> = new Map();
|
|
|
|
server.on('upgrade', (request: any, socket: any, head: any) => {
|
|
if (request.url === '/api/voice/media-stream') {
|
|
logger.log('=== MEDIA STREAM WEBSOCKET UPGRADE REQUEST ===');
|
|
logger.log(`Path: ${request.url}`);
|
|
|
|
wss.handleUpgrade(request, socket, head, (ws: any) => {
|
|
logger.log('=== MEDIA STREAM WEBSOCKET UPGRADED SUCCESSFULLY ===');
|
|
handleMediaStreamSocket(ws);
|
|
});
|
|
}
|
|
});
|
|
|
|
async function handleMediaStreamSocket(ws: any) {
|
|
let streamSid: string | null = null;
|
|
let callSid: string | null = null;
|
|
let tenantDomain: string | null = null;
|
|
let mediaPacketCount = 0;
|
|
|
|
ws.on('message', async (message: Buffer) => {
|
|
try {
|
|
const msg = JSON.parse(message.toString());
|
|
|
|
switch (msg.event) {
|
|
case 'connected':
|
|
logger.log('=== MEDIA STREAM EVENT: CONNECTED ===');
|
|
logger.log(`Protocol: ${msg.protocol}`);
|
|
logger.log(`Version: ${msg.version}`);
|
|
break;
|
|
|
|
case 'start':
|
|
streamSid = msg.streamSid;
|
|
callSid = msg.start.callSid;
|
|
tenantDomain = msg.start.customParameters?.tenantId || 'tenant1';
|
|
|
|
logger.log(`=== MEDIA STREAM EVENT: START ===`);
|
|
logger.log(`StreamSid: ${streamSid}`);
|
|
logger.log(`CallSid: ${callSid}`);
|
|
logger.log(`Tenant: ${tenantDomain}`);
|
|
logger.log(`MediaFormat: ${JSON.stringify(msg.start.mediaFormat)}`);
|
|
|
|
mediaStreams.set(streamSid, ws);
|
|
logger.log(`Stored WebSocket for streamSid: ${streamSid}. Total active streams: ${mediaStreams.size}`);
|
|
|
|
// Initialize OpenAI Realtime connection
|
|
logger.log(`Initializing OpenAI Realtime for call ${callSid}...`);
|
|
try {
|
|
await voiceService.initializeOpenAIRealtime({
|
|
callSid,
|
|
tenantId: tenantDomain,
|
|
userId: msg.start.customParameters?.userId || 'system',
|
|
});
|
|
logger.log(`✓ OpenAI Realtime initialized for call ${callSid}`);
|
|
} catch (error: any) {
|
|
logger.error(`Failed to initialize OpenAI: ${error.message}`);
|
|
}
|
|
break;
|
|
|
|
case 'media':
|
|
mediaPacketCount++;
|
|
// Only log every 500 packets to reduce noise
|
|
if (mediaPacketCount % 500 === 0) {
|
|
logger.log(`Received media packet #${mediaPacketCount} for StreamSid: ${streamSid}`);
|
|
}
|
|
|
|
if (!callSid || !tenantDomain) {
|
|
logger.warn('Received media before start event');
|
|
break;
|
|
}
|
|
|
|
try {
|
|
// Convert Twilio audio (μ-law 8kHz) to OpenAI format (PCM16 24kHz)
|
|
const twilioAudio = msg.media.payload;
|
|
const openaiAudio = audioConverter.twilioToOpenAI(twilioAudio);
|
|
|
|
// Send audio to OpenAI Realtime API
|
|
await voiceService.sendAudioToOpenAI(callSid, openaiAudio);
|
|
} catch (error: any) {
|
|
logger.error(`Error processing media: ${error.message}`);
|
|
}
|
|
break;
|
|
|
|
case 'stop':
|
|
logger.log(`=== MEDIA STREAM EVENT: STOP ===`);
|
|
logger.log(`StreamSid: ${streamSid}`);
|
|
logger.log(`Total media packets received: ${mediaPacketCount}`);
|
|
|
|
if (streamSid) {
|
|
mediaStreams.delete(streamSid);
|
|
logger.log(`Removed WebSocket for streamSid: ${streamSid}`);
|
|
}
|
|
|
|
// Clean up OpenAI connection
|
|
if (callSid) {
|
|
try {
|
|
logger.log(`Cleaning up OpenAI connection for call ${callSid}...`);
|
|
await voiceService.cleanupOpenAIConnection(callSid);
|
|
logger.log(`✓ OpenAI connection cleaned up`);
|
|
} catch (error: any) {
|
|
logger.error(`Failed to cleanup OpenAI: ${error.message}`);
|
|
}
|
|
}
|
|
break;
|
|
|
|
default:
|
|
logger.debug(`Unknown media stream event: ${msg.event}`);
|
|
}
|
|
} catch (error: any) {
|
|
logger.error(`Error processing media stream message: ${error.message}`);
|
|
}
|
|
});
|
|
|
|
ws.on('close', () => {
|
|
logger.log(`=== MEDIA STREAM WEBSOCKET CLOSED ===`);
|
|
if (streamSid) {
|
|
mediaStreams.delete(streamSid);
|
|
}
|
|
});
|
|
|
|
ws.on('error', (error: Error) => {
|
|
logger.error(`=== MEDIA STREAM WEBSOCKET ERROR ===`);
|
|
logger.error(`Error message: ${error.message}`);
|
|
});
|
|
}
|
|
|
|
console.log(`🚀 Application is running on: http://localhost:${port}/api`);
|
|
}
|
|
|
|
bootstrap();
|