useChatStream.ts 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. import { ref } from 'vue'
  2. import hookFetch from 'hook-fetch'
  3. import { sseTextDecoderPlugin } from 'hook-fetch/plugins/sse'
  4. import type { ChatSseMessage } from '../types'
  5. const api = hookFetch.create({
  6. baseURL: '',
  7. timeout: 0,
  8. headers: {
  9. 'Content-Type': 'application/json;charset=utf-8'
  10. }
  11. })
  12. api.use(sseTextDecoderPlugin({ json: false }))
  13. export function useChatStream() {
  14. const isLoading = ref(false)
  15. let activeController: AbortController | null = null
  16. let activeStreamId = 0
  17. const cancelledStreamIds = new Set<number>()
  18. const cancelRequest = () => {
  19. if (activeStreamId) {
  20. cancelledStreamIds.add(activeStreamId)
  21. }
  22. activeController?.abort()
  23. api.abortAll()
  24. isLoading.value = false
  25. }
  26. const parseSseBlock = (block: string): ChatSseMessage[] => {
  27. const dataLines = block
  28. .split(/\r?\n/)
  29. .filter((line) => line.startsWith('data:'))
  30. .map((line) => line.replace(/^data:\s?/, ''))
  31. if (!dataLines.length) {
  32. const text = block.trim()
  33. return text ? [{ response_type: 'answer', content: text, done: false }] : []
  34. }
  35. const data = dataLines.join('\n').trim()
  36. console.log('msg:', data)
  37. try {
  38. return [JSON.parse(data)]
  39. } catch {
  40. return [{ response_type: 'answer', content: data, done: false }]
  41. }
  42. }
  43. const getErrorMessage = (payload: any) => {
  44. if (!payload || typeof payload !== 'object') return ''
  45. return `${payload.error || payload.errors?.message || payload.message || payload.msg || ''}`.trim()
  46. }
  47. const normalizeDirectResponseError = (result: unknown) => {
  48. const payload =
  49. typeof result === 'string'
  50. ? (() => {
  51. const text = result.trim()
  52. if (!text.startsWith('{') || !text.endsWith('}')) return null
  53. try {
  54. return JSON.parse(text)
  55. } catch {
  56. return null
  57. }
  58. })()
  59. : result && typeof result === 'object'
  60. ? result
  61. : null
  62. if (!payload || Array.isArray(payload)) return null
  63. if (payload.response_type || payload.data) return null
  64. if (payload.isSuccess === false) {
  65. return new Error(getErrorMessage(payload) || 'Request failed')
  66. }
  67. return null
  68. }
  69. const normalizeRequestError = async (error: any) => {
  70. if (!error) return new Error('Unknown error')
  71. if (error instanceof Error) return error
  72. const message =
  73. getErrorMessage(error) ||
  74. (error?.status ? `HTTP error! status: ${error.status}` : 'Request failed')
  75. return new Error(message)
  76. }
  77. const streamChat = async (
  78. url: string,
  79. body: any,
  80. onChunk: (chunk: ChatSseMessage) => void,
  81. onComplete: () => void,
  82. onError: (err: Error) => void
  83. ) => {
  84. cancelRequest()
  85. isLoading.value = true
  86. const streamId = activeStreamId + 1
  87. activeStreamId = streamId
  88. const controller = new AbortController()
  89. activeController = controller
  90. let completeReceived = false
  91. let completed = false
  92. const finishStream = () => {
  93. if (!completed) {
  94. completed = true
  95. onComplete()
  96. }
  97. }
  98. try {
  99. const token = // localStorage.getItem('oauth2token') ||
  100. document.cookie.match(new RegExp('(^| )' + 'x-sessionId' + '=([^;]*)(;|$)'))?.[2]
  101. // dev读取环境变量 prod使用当前
  102. const baseUrl = import.meta.env.DEV ? `http://${import.meta.env.VITE_BASE_URL}` : ''
  103. const request = api.post(baseUrl + url, body, {
  104. signal: controller.signal,
  105. headers: {
  106. Authorization: token || ''
  107. },
  108. credentials: 'include'
  109. })
  110. for await (const chunk of request.stream<string>()) {
  111. if (chunk.error) {
  112. throw await normalizeRequestError(chunk.error)
  113. }
  114. const directResponseError = normalizeDirectResponseError(chunk.result)
  115. if (directResponseError) {
  116. throw directResponseError
  117. }
  118. const result = typeof chunk.result === 'string' ? chunk.result : `${chunk.result ?? ''}`
  119. if (!result.trim()) continue
  120. const results = parseSseBlock(result)
  121. results.forEach(onChunk)
  122. if (results.some((item) => item?.response_type === 'complete')) {
  123. completeReceived = true
  124. controller.abort()
  125. break
  126. }
  127. }
  128. if (completeReceived) {
  129. finishStream()
  130. } else if (!cancelledStreamIds.has(streamId)) {
  131. throw new Error('Response stream ended before complete')
  132. }
  133. } catch (error: any) {
  134. if (error?.name === 'AbortError' && (completeReceived || cancelledStreamIds.has(streamId))) {
  135. finishStream()
  136. } else if (error?.name !== 'AbortError') {
  137. onError(await normalizeRequestError(error))
  138. }
  139. } finally {
  140. if (activeStreamId === streamId) {
  141. activeController = null
  142. isLoading.value = false
  143. }
  144. cancelledStreamIds.delete(streamId)
  145. }
  146. }
  147. return {
  148. isLoading,
  149. cancelRequest,
  150. streamChat
  151. }
  152. }