/** * Utility functions for handling streaming responses from different providers */ // Function to decode OpenAI streaming data export function decodeOpenAIStreamChunk(chunk: Uint8Array): string { const decoder = new TextDecoder('utf-8'); const decoded = decoder.decode(chunk); // OpenAI sends 'data: ' prefixed chunks, one per line // Extract the actual JSON data const lines = decoded .split('\n') .filter(line => line.trim() !== '' && line.trim() !== 'data: [DONE]') .map(line => line.replace(/^data: /, '')); if (lines.length === 0) return ''; try { // Each line should be a JSON object with choices const parsedLines = lines.map(line => JSON.parse(line)); // Extract content delta from each chunk const contents = parsedLines .map(obj => obj.choices?.[0]?.delta?.content || '') .join(''); return contents; } catch (error) { console.error('Error parsing stream data:', error); return ''; } } // Function to decode Anthropic streaming data export function decodeAnthropicStreamChunk(chunk: Uint8Array): string { const decoder = new TextDecoder('utf-8'); const decoded = decoder.decode(chunk); // Anthropic sends 'event: content_block_delta' and 'data: {"type":"content_block_delta",...}' const contentLines = decoded .split('\n') .filter(line => line.startsWith('data:')) .map(line => line.replace(/^data: /, '')); if (contentLines.length === 0) return ''; try { const parsedLines = contentLines.map(line => JSON.parse(line)); // Extract content delta from each chunk const contents = parsedLines .map(obj => { if (obj.type === 'content_block_delta') { return obj.delta?.text || ''; } return ''; }) .join(''); return contents; } catch (error) { console.error('Error parsing Anthropic stream data:', error); return ''; } } // Function to decode Google streaming data export function decodeGoogleStreamChunk(chunk: Uint8Array): string { const decoder = new TextDecoder('utf-8'); const decoded = decoder.decode(chunk); try { const data = JSON.parse(decoded); // Extract content from Google's response format return data.candidates?.[0]?.content?.parts?.[0]?.text || data.candidates?.[0]?.content?.parts?.[0]?.delta?.text || ''; } catch (error) { console.error('Error parsing Google stream data:', error); return ''; } } // Helper to process streams generically export async function processStream( stream: ReadableStream, onChunk: (text: string) => void, decodeFunction: (chunk: Uint8Array) => string ): Promise { const reader = stream.getReader(); try { while (true) { const { done, value } = await reader.read(); if (done) break; // Decode the chunk according to the provider format const text = decodeFunction(value); if (text) { onChunk(text); } } } catch (error) { console.error('Error reading from stream:', error); throw error; } finally { reader.releaseLock(); } }