WIP - twilio integration
This commit is contained in:
575
backend/src/voice/voice.service.ts
Normal file
575
backend/src/voice/voice.service.ts
Normal file
@@ -0,0 +1,575 @@
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
import { TenantDatabaseService } from '../tenant/tenant-database.service';
|
||||
import { getCentralPrisma } from '../prisma/central-prisma.service';
|
||||
import { IntegrationsConfig, TwilioConfig, OpenAIConfig } from './interfaces/integration-config.interface';
|
||||
import * as Twilio from 'twilio';
|
||||
import { WebSocket } from 'ws';
|
||||
import { v4 as uuidv4 } from 'uuid';
|
||||
|
||||
@Injectable()
|
||||
export class VoiceService {
|
||||
private readonly logger = new Logger(VoiceService.name);
|
||||
private twilioClients: Map<string, Twilio.Twilio> = new Map();
|
||||
private openaiConnections: Map<string, WebSocket> = new Map(); // callSid -> WebSocket
|
||||
private callStates: Map<string, any> = new Map(); // callSid -> call state
|
||||
|
||||
constructor(
|
||||
private readonly tenantDbService: TenantDatabaseService,
|
||||
) {}
|
||||
|
||||
/**
|
||||
* Get Twilio client for a tenant
|
||||
*/
|
||||
private async getTwilioClient(tenantId: string): Promise<{ client: Twilio.Twilio; config: TwilioConfig }> {
|
||||
// Check cache first
|
||||
if (this.twilioClients.has(tenantId)) {
|
||||
const centralPrisma = getCentralPrisma();
|
||||
const tenant = await centralPrisma.tenant.findUnique({
|
||||
where: { id: tenantId },
|
||||
select: { integrationsConfig: true },
|
||||
});
|
||||
|
||||
const config = this.getIntegrationConfig(tenant?.integrationsConfig as any);
|
||||
return { client: this.twilioClients.get(tenantId), config: config.twilio };
|
||||
}
|
||||
|
||||
// Fetch tenant integrations config
|
||||
const centralPrisma = getCentralPrisma();
|
||||
const tenant = await centralPrisma.tenant.findUnique({
|
||||
where: { id: tenantId },
|
||||
select: { integrationsConfig: true },
|
||||
});
|
||||
|
||||
if (!tenant?.integrationsConfig) {
|
||||
throw new Error('Tenant integrations config not found');
|
||||
}
|
||||
|
||||
const config = this.getIntegrationConfig(tenant.integrationsConfig as any);
|
||||
|
||||
if (!config.twilio?.accountSid || !config.twilio?.authToken) {
|
||||
throw new Error('Twilio credentials not configured for tenant');
|
||||
}
|
||||
|
||||
const client = Twilio.default(config.twilio.accountSid, config.twilio.authToken);
|
||||
this.twilioClients.set(tenantId, client);
|
||||
|
||||
return { client, config: config.twilio };
|
||||
}
|
||||
|
||||
/**
|
||||
* Decrypt and parse integrations config
|
||||
*/
|
||||
private getIntegrationConfig(encryptedConfig: any): IntegrationsConfig {
|
||||
if (!encryptedConfig) {
|
||||
return {};
|
||||
}
|
||||
|
||||
// If it's already decrypted (object), return it
|
||||
if (typeof encryptedConfig === 'object' && encryptedConfig.twilio) {
|
||||
return encryptedConfig;
|
||||
}
|
||||
|
||||
// If it's encrypted (string), decrypt it
|
||||
if (typeof encryptedConfig === 'string') {
|
||||
return this.tenantDbService.decryptIntegrationsConfig(encryptedConfig);
|
||||
}
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
/**
|
||||
* Initiate outbound call
|
||||
*/
|
||||
async initiateCall(params: {
|
||||
tenantId: string;
|
||||
userId: string;
|
||||
toNumber: string;
|
||||
}) {
|
||||
const { tenantId, userId, toNumber } = params;
|
||||
|
||||
try {
|
||||
const { client, config } = await this.getTwilioClient(tenantId);
|
||||
|
||||
// Create call record in database
|
||||
const tenantKnex = await this.tenantDbService.getTenantKnexById(tenantId);
|
||||
const callId = uuidv4();
|
||||
|
||||
// Generate TwiML URL for call flow
|
||||
const twimlUrl = `${process.env.BACKEND_URL || 'http://localhost:3000'}/api/voice/twiml/outbound`;
|
||||
|
||||
// Initiate call via Twilio
|
||||
const call = await client.calls.create({
|
||||
to: toNumber,
|
||||
from: config.phoneNumber,
|
||||
url: twimlUrl,
|
||||
statusCallback: `${process.env.BACKEND_URL || 'http://localhost:3000'}/api/voice/webhook/status`,
|
||||
statusCallbackEvent: ['initiated', 'ringing', 'answered', 'completed'],
|
||||
statusCallbackMethod: 'POST',
|
||||
record: true,
|
||||
recordingStatusCallback: `${process.env.BACKEND_URL || 'http://localhost:3000'}/api/voice/webhook/recording`,
|
||||
});
|
||||
|
||||
// Store call in database
|
||||
await tenantKnex('calls').insert({
|
||||
id: callId,
|
||||
call_sid: call.sid,
|
||||
direction: 'outbound',
|
||||
from_number: config.phoneNumber,
|
||||
to_number: toNumber,
|
||||
status: 'queued',
|
||||
user_id: userId,
|
||||
created_at: tenantKnex.fn.now(),
|
||||
updated_at: tenantKnex.fn.now(),
|
||||
});
|
||||
|
||||
// Store call state in memory
|
||||
this.callStates.set(call.sid, {
|
||||
callId,
|
||||
callSid: call.sid,
|
||||
tenantId,
|
||||
userId,
|
||||
direction: 'outbound',
|
||||
status: 'queued',
|
||||
});
|
||||
|
||||
this.logger.log(`Outbound call initiated: ${call.sid}`);
|
||||
|
||||
return {
|
||||
callId,
|
||||
callSid: call.sid,
|
||||
status: 'queued',
|
||||
};
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to initiate call', error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Accept incoming call
|
||||
*/
|
||||
async acceptCall(params: {
|
||||
callSid: string;
|
||||
tenantId: string;
|
||||
userId: string;
|
||||
}) {
|
||||
const { callSid, tenantId, userId } = params;
|
||||
|
||||
try {
|
||||
// Note: Twilio doesn't support updating call to 'in-progress' via API
|
||||
// Call status is managed by TwiML and call flow
|
||||
// We'll update our database status instead
|
||||
|
||||
// Update database
|
||||
const tenantKnex = await this.tenantDbService.getTenantKnexById(tenantId);
|
||||
await tenantKnex('calls')
|
||||
.where({ call_sid: callSid })
|
||||
.update({
|
||||
status: 'in-progress',
|
||||
user_id: userId,
|
||||
started_at: tenantKnex.fn.now(),
|
||||
updated_at: tenantKnex.fn.now(),
|
||||
});
|
||||
|
||||
// Update state
|
||||
const state = this.callStates.get(callSid) || {};
|
||||
this.callStates.set(callSid, {
|
||||
...state,
|
||||
status: 'in-progress',
|
||||
userId,
|
||||
});
|
||||
|
||||
this.logger.log(`Call accepted: ${callSid} by user ${userId}`);
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to accept call', error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reject incoming call
|
||||
*/
|
||||
async rejectCall(callSid: string, tenantId: string) {
|
||||
try {
|
||||
const { client } = await this.getTwilioClient(tenantId);
|
||||
|
||||
// End the call
|
||||
await client.calls(callSid).update({
|
||||
status: 'completed',
|
||||
});
|
||||
|
||||
// Update database
|
||||
const tenantKnex = await this.tenantDbService.getTenantKnexById(tenantId);
|
||||
await tenantKnex('calls')
|
||||
.where({ call_sid: callSid })
|
||||
.update({
|
||||
status: 'canceled',
|
||||
updated_at: tenantKnex.fn.now(),
|
||||
});
|
||||
|
||||
// Clean up state
|
||||
this.callStates.delete(callSid);
|
||||
|
||||
this.logger.log(`Call rejected: ${callSid}`);
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to reject call', error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* End active call
|
||||
*/
|
||||
async endCall(callSid: string, tenantId: string) {
|
||||
try {
|
||||
const { client } = await this.getTwilioClient(tenantId);
|
||||
|
||||
// End the call
|
||||
await client.calls(callSid).update({
|
||||
status: 'completed',
|
||||
});
|
||||
|
||||
// Clean up OpenAI connection if exists
|
||||
const openaiWs = this.openaiConnections.get(callSid);
|
||||
if (openaiWs) {
|
||||
openaiWs.close();
|
||||
this.openaiConnections.delete(callSid);
|
||||
}
|
||||
|
||||
// Update database
|
||||
const tenantKnex = await this.tenantDbService.getTenantKnexById(tenantId);
|
||||
await tenantKnex('calls')
|
||||
.where({ call_sid: callSid })
|
||||
.update({
|
||||
status: 'completed',
|
||||
ended_at: tenantKnex.fn.now(),
|
||||
updated_at: tenantKnex.fn.now(),
|
||||
});
|
||||
|
||||
// Clean up state
|
||||
this.callStates.delete(callSid);
|
||||
|
||||
this.logger.log(`Call ended: ${callSid}`);
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to end call', error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send DTMF tones
|
||||
*/
|
||||
async sendDtmf(callSid: string, digit: string, tenantId: string) {
|
||||
try {
|
||||
const { client } = await this.getTwilioClient(tenantId);
|
||||
|
||||
// Twilio doesn't support sending DTMF directly via API
|
||||
// This would need to be handled via TwiML <Play> of DTMF tones
|
||||
this.logger.log(`DTMF requested for call ${callSid}: ${digit}`);
|
||||
|
||||
// TODO: Implement DTMF sending via TwiML update
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to send DTMF', error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get call state
|
||||
*/
|
||||
async getCallState(callSid: string, tenantId: string) {
|
||||
// Try memory first
|
||||
if (this.callStates.has(callSid)) {
|
||||
return this.callStates.get(callSid);
|
||||
}
|
||||
|
||||
// Fallback to database
|
||||
const tenantKnex = await this.tenantDbService.getTenantKnexById(tenantId);
|
||||
const call = await tenantKnex('calls')
|
||||
.where({ call_sid: callSid })
|
||||
.first();
|
||||
|
||||
return call || null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update call status from webhook
|
||||
*/
|
||||
async updateCallStatus(params: {
|
||||
callSid: string;
|
||||
tenantId: string;
|
||||
status: string;
|
||||
duration?: number;
|
||||
recordingUrl?: string;
|
||||
}) {
|
||||
const { callSid, tenantId, status, duration, recordingUrl } = params;
|
||||
|
||||
try {
|
||||
const tenantKnex = await this.tenantDbService.getTenantKnexById(tenantId);
|
||||
|
||||
const updateData: any = {
|
||||
status,
|
||||
updated_at: tenantKnex.fn.now(),
|
||||
};
|
||||
|
||||
if (duration !== undefined) {
|
||||
updateData.duration_seconds = duration;
|
||||
}
|
||||
|
||||
if (recordingUrl) {
|
||||
updateData.recording_url = recordingUrl;
|
||||
}
|
||||
|
||||
if (status === 'completed') {
|
||||
updateData.ended_at = tenantKnex.fn.now();
|
||||
}
|
||||
|
||||
await tenantKnex('calls')
|
||||
.where({ call_sid: callSid })
|
||||
.update(updateData);
|
||||
|
||||
// Update state
|
||||
const state = this.callStates.get(callSid);
|
||||
if (state) {
|
||||
this.callStates.set(callSid, { ...state, status });
|
||||
}
|
||||
|
||||
this.logger.log(`Call status updated: ${callSid} -> ${status}`);
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to update call status', error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize OpenAI Realtime connection for call
|
||||
*/
|
||||
async initializeOpenAIRealtime(params: {
|
||||
callSid: string;
|
||||
tenantId: string;
|
||||
userId: string;
|
||||
}) {
|
||||
const { callSid, tenantId, userId } = params;
|
||||
|
||||
try {
|
||||
// Get OpenAI config
|
||||
const centralPrisma = getCentralPrisma();
|
||||
const tenant = await centralPrisma.tenant.findUnique({
|
||||
where: { id: tenantId },
|
||||
select: { integrationsConfig: true },
|
||||
});
|
||||
|
||||
const config = this.getIntegrationConfig(tenant?.integrationsConfig as any);
|
||||
|
||||
if (!config.openai?.apiKey) {
|
||||
this.logger.warn('OpenAI not configured for tenant, skipping AI features');
|
||||
return;
|
||||
}
|
||||
|
||||
// Connect to OpenAI Realtime API
|
||||
const ws = new WebSocket('wss://api.openai.com/v1/realtime', {
|
||||
headers: {
|
||||
'Authorization': `Bearer ${config.openai.apiKey}`,
|
||||
'OpenAI-Beta': 'realtime=v1',
|
||||
},
|
||||
});
|
||||
|
||||
ws.on('open', () => {
|
||||
this.logger.log(`OpenAI Realtime connected for call ${callSid}`);
|
||||
|
||||
// Initialize session
|
||||
ws.send(JSON.stringify({
|
||||
type: 'session.update',
|
||||
session: {
|
||||
model: config.openai.model || 'gpt-4o-realtime-preview',
|
||||
voice: config.openai.voice || 'alloy',
|
||||
instructions: 'You are a helpful AI assistant providing real-time support during phone calls. Provide concise, actionable suggestions to help the user.',
|
||||
turn_detection: {
|
||||
type: 'server_vad',
|
||||
},
|
||||
tools: this.getOpenAITools(),
|
||||
},
|
||||
}));
|
||||
});
|
||||
|
||||
ws.on('message', (data: Buffer) => {
|
||||
this.handleOpenAIMessage(callSid, tenantId, userId, JSON.parse(data.toString()));
|
||||
});
|
||||
|
||||
ws.on('error', (error) => {
|
||||
this.logger.error(`OpenAI WebSocket error for call ${callSid}`, error);
|
||||
});
|
||||
|
||||
ws.on('close', () => {
|
||||
this.logger.log(`OpenAI Realtime disconnected for call ${callSid}`);
|
||||
this.openaiConnections.delete(callSid);
|
||||
});
|
||||
|
||||
this.openaiConnections.set(callSid, ws);
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to initialize OpenAI Realtime', error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle OpenAI Realtime messages
|
||||
*/
|
||||
private async handleOpenAIMessage(
|
||||
callSid: string,
|
||||
tenantId: string,
|
||||
userId: string,
|
||||
message: any,
|
||||
) {
|
||||
try {
|
||||
switch (message.type) {
|
||||
case 'conversation.item.created':
|
||||
if (message.item.type === 'message' && message.item.role === 'assistant') {
|
||||
// AI response generated
|
||||
this.logger.log(`AI response for call ${callSid}`);
|
||||
}
|
||||
break;
|
||||
|
||||
case 'response.audio_transcript.delta':
|
||||
// Real-time transcript
|
||||
// TODO: Emit to gateway
|
||||
break;
|
||||
|
||||
case 'response.audio_transcript.done':
|
||||
// Final transcript
|
||||
const transcript = message.transcript;
|
||||
await this.updateCallTranscript(callSid, tenantId, transcript);
|
||||
break;
|
||||
|
||||
case 'response.function_call_arguments.done':
|
||||
// Tool call completed
|
||||
await this.handleToolCall(callSid, tenantId, userId, message);
|
||||
break;
|
||||
|
||||
default:
|
||||
// Handle other message types
|
||||
break;
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to handle OpenAI message', error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Define OpenAI tools for CRM actions
|
||||
*/
|
||||
private getOpenAITools(): any[] {
|
||||
return [
|
||||
{
|
||||
type: 'function',
|
||||
name: 'search_contact',
|
||||
description: 'Search for a contact by name, email, or phone number',
|
||||
parameters: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
query: {
|
||||
type: 'string',
|
||||
description: 'Search query (name, email, or phone)',
|
||||
},
|
||||
},
|
||||
required: ['query'],
|
||||
},
|
||||
},
|
||||
{
|
||||
type: 'function',
|
||||
name: 'create_task',
|
||||
description: 'Create a follow-up task based on the call',
|
||||
parameters: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
title: {
|
||||
type: 'string',
|
||||
description: 'Task title',
|
||||
},
|
||||
description: {
|
||||
type: 'string',
|
||||
description: 'Task description',
|
||||
},
|
||||
dueDate: {
|
||||
type: 'string',
|
||||
description: 'Due date (ISO format)',
|
||||
},
|
||||
},
|
||||
required: ['title'],
|
||||
},
|
||||
},
|
||||
{
|
||||
type: 'function',
|
||||
name: 'update_contact',
|
||||
description: 'Update contact information',
|
||||
parameters: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
contactId: {
|
||||
type: 'string',
|
||||
description: 'Contact ID',
|
||||
},
|
||||
fields: {
|
||||
type: 'object',
|
||||
description: 'Fields to update',
|
||||
},
|
||||
},
|
||||
required: ['contactId', 'fields'],
|
||||
},
|
||||
},
|
||||
];
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle tool calls from OpenAI
|
||||
*/
|
||||
private async handleToolCall(
|
||||
callSid: string,
|
||||
tenantId: string,
|
||||
userId: string,
|
||||
message: any,
|
||||
) {
|
||||
// TODO: Implement actual tool execution
|
||||
// This would call the appropriate services based on the tool name
|
||||
// Respecting RBAC permissions for the user
|
||||
this.logger.log(`Tool call for call ${callSid}: ${message.name}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Update call transcript
|
||||
*/
|
||||
private async updateCallTranscript(
|
||||
callSid: string,
|
||||
tenantId: string,
|
||||
transcript: string,
|
||||
) {
|
||||
try {
|
||||
const tenantKnex = await this.tenantDbService.getTenantKnexById(tenantId);
|
||||
await tenantKnex('calls')
|
||||
.where({ call_sid: callSid })
|
||||
.update({
|
||||
ai_transcript: transcript,
|
||||
updated_at: tenantKnex.fn.now(),
|
||||
});
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to update transcript', error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get call history for user
|
||||
*/
|
||||
async getCallHistory(tenantId: string, userId: string, limit = 50) {
|
||||
try {
|
||||
const tenantKnex = await this.tenantDbService.getTenantKnexById(tenantId);
|
||||
const calls = await tenantKnex('calls')
|
||||
.where({ user_id: userId })
|
||||
.orderBy('created_at', 'desc')
|
||||
.limit(limit);
|
||||
|
||||
return calls;
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to get call history', error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user