| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179 |
- import { ref } from 'vue'
- import hookFetch from 'hook-fetch'
- import { sseTextDecoderPlugin } from 'hook-fetch/plugins/sse'
- import type { ChatSseMessage } from '../types'
- const api = hookFetch.create({
- baseURL: '',
- timeout: 0,
- headers: {
- 'Content-Type': 'application/json;charset=utf-8'
- }
- })
- api.use(sseTextDecoderPlugin({ json: false }))
- export function useChatStream() {
- const isLoading = ref(false)
- let activeController: AbortController | null = null
- let activeStreamId = 0
- const cancelledStreamIds = new Set<number>()
- const cancelRequest = () => {
- if (activeStreamId) {
- cancelledStreamIds.add(activeStreamId)
- }
- activeController?.abort()
- api.abortAll()
- isLoading.value = false
- }
- const parseSseBlock = (block: string): ChatSseMessage[] => {
- const dataLines = block
- .split(/\r?\n/)
- .filter((line) => line.startsWith('data:'))
- .map((line) => line.replace(/^data:\s?/, ''))
- if (!dataLines.length) {
- const text = block.trim()
- return text ? [{ response_type: 'answer', content: text, done: false }] : []
- }
- const data = dataLines.join('\n').trim()
- console.log('msg:', data)
- try {
- return [JSON.parse(data)]
- } catch {
- return [{ response_type: 'answer', content: data, done: false }]
- }
- }
- const getErrorMessage = (payload: any) => {
- if (!payload || typeof payload !== 'object') return ''
- return `${payload.error || payload.errors?.message || payload.message || payload.msg || ''}`.trim()
- }
- const normalizeDirectResponseError = (result: unknown) => {
- const payload =
- typeof result === 'string'
- ? (() => {
- const text = result.trim()
- if (!text.startsWith('{') || !text.endsWith('}')) return null
- try {
- return JSON.parse(text)
- } catch {
- return null
- }
- })()
- : result && typeof result === 'object'
- ? result
- : null
- if (!payload || Array.isArray(payload)) return null
- if (payload.response_type || payload.data) return null
- if (payload.isSuccess === false) {
- return new Error(getErrorMessage(payload) || 'Request failed')
- }
- return null
- }
- const normalizeRequestError = async (error: any) => {
- if (!error) return new Error('Unknown error')
- if (error instanceof Error) return error
- const message =
- getErrorMessage(error) ||
- (error?.status ? `HTTP error! status: ${error.status}` : 'Request failed')
- return new Error(message)
- }
- const streamChat = async (
- url: string,
- body: any,
- onChunk: (chunk: ChatSseMessage) => void,
- onComplete: () => void,
- onError: (err: Error) => void
- ) => {
- cancelRequest()
- isLoading.value = true
- const streamId = activeStreamId + 1
- activeStreamId = streamId
- const controller = new AbortController()
- activeController = controller
- let completeReceived = false
- let completed = false
- const finishStream = () => {
- if (!completed) {
- completed = true
- onComplete()
- }
- }
- try {
- const token = // localStorage.getItem('oauth2token') ||
- document.cookie.match(new RegExp('(^| )' + 'x-sessionId' + '=([^;]*)(;|$)'))?.[2]
- // dev读取环境变量 prod使用当前
- const baseUrl = import.meta.env.DEV ? `http://${import.meta.env.VITE_BASE_URL}` : ''
- const request = api.post(baseUrl + url, body, {
- signal: controller.signal,
- headers: {
- Authorization: token || ''
- },
- credentials: 'include'
- })
- for await (const chunk of request.stream<string>()) {
- if (chunk.error) {
- throw await normalizeRequestError(chunk.error)
- }
- const directResponseError = normalizeDirectResponseError(chunk.result)
- if (directResponseError) {
- throw directResponseError
- }
- const result = typeof chunk.result === 'string' ? chunk.result : `${chunk.result ?? ''}`
- if (!result.trim()) continue
- const results = parseSseBlock(result)
- results.forEach(onChunk)
- if (results.some((item) => item?.response_type === 'complete')) {
- completeReceived = true
- controller.abort()
- break
- }
- }
- if (completeReceived) {
- finishStream()
- } else if (!cancelledStreamIds.has(streamId)) {
- throw new Error('Response stream ended before complete')
- }
- } catch (error: any) {
- if (error?.name === 'AbortError' && (completeReceived || cancelledStreamIds.has(streamId))) {
- finishStream()
- } else if (error?.name !== 'AbortError') {
- onError(await normalizeRequestError(error))
- }
- } finally {
- if (activeStreamId === streamId) {
- activeController = null
- isLoading.value = false
- }
- cancelledStreamIds.delete(streamId)
- }
- }
- return {
- isLoading,
- cancelRequest,
- streamChat
- }
- }
|