diff --git a/.env.example b/.env.example index 5c7401b4..5e80a8db 100644 --- a/.env.example +++ b/.env.example @@ -3,6 +3,9 @@ NEXT_PUBLIC_SUPABASE_URL=your_supabase_project_url NEXT_PUBLIC_SUPABASE_ANON_KEY=your_supabase_anon_key SUPABASE_SERVICE_ROLE=your_supabase_service_role_key +# Redis Configuration +REDIS_URL=redis://localhost:6379 + # CSRF Protection (required) CSRF_SECRET=your_32_character_random_string diff --git a/INSTALL.md b/INSTALL.md index 0fa6235b..5a56cfdc 100644 --- a/INSTALL.md +++ b/INSTALL.md @@ -22,6 +22,9 @@ NEXT_PUBLIC_SUPABASE_URL=your_supabase_url NEXT_PUBLIC_SUPABASE_ANON_KEY=your_supabase_anon_key SUPABASE_SERVICE_ROLE=your_supabase_service_role_key +# Redis +REDIS_URL=redis://localhost:6379 + # OpenAI OPENAI_API_KEY=your_openai_api_key @@ -151,6 +154,14 @@ Here are the detailed steps to set up Google OAuth: This allows users limited access to try the product before properly creating an account. +### Redis + +Start a redis via docker: + +```bash +docker run -p 6379:6379 -d redis:8.0-rc1 +``` + ### Database Schema Create the following tables in your Supabase SQL editor: @@ -200,6 +211,15 @@ CREATE TABLE chats ( CONSTRAINT chats_project_id_fkey FOREIGN KEY (project_id) REFERENCES projects(id) ON DELETE CASCADE ); +-- Stream IDs table +CREATE TABLE stream_ids ( + id SERIAL PRIMARY KEY, + stream_id UUID NOT NULL, + chat_id UUID NOT NULL, + created_at TIMESTAMPTZ DEFAULT NOW(), + CONSTRAINT stream_ids_chat_id_fkey FOREIGN KEY (chat_id) REFERENCES chats(id) ON DELETE CASCADE +); + -- Messages table CREATE TABLE messages ( id SERIAL PRIMARY KEY, -- Using SERIAL for auto-incrementing integer ID diff --git a/app/api/chat/db.ts b/app/api/chat/db.ts index 574a556e..d9cec9a0 100644 --- a/app/api/chat/db.ts +++ b/app/api/chat/db.ts @@ -90,3 +90,24 @@ export async function saveFinalAssistantMessage( console.log("Assistant message saved successfully (merged).") } } + +export async function createStreamId( + supabase: SupabaseClient, + { + streamId, + chatId, + }: { + streamId: string + chatId: string + } +) { + const { error } = await supabase.from("stream_ids").insert({ + stream_id: streamId, + chat_id: chatId, + }) + + if (error) { + console.error("Error creating stream ID:", error) + throw new Error(`Failed to create stream ID: ${error.message}`) + } +} diff --git a/app/api/chat/route.ts b/app/api/chat/route.ts index f4358b99..9e137ee9 100644 --- a/app/api/chat/route.ts +++ b/app/api/chat/route.ts @@ -1,19 +1,34 @@ +import { getMessagesFromDb } from "@/lib/chat-store/messages/api" import { SYSTEM_PROMPT_DEFAULT } from "@/lib/config" import { getAllModels } from "@/lib/models" import { getProviderForModel } from "@/lib/openproviders/provider-map" +import { createClient } from "@/lib/supabase/server" import type { ProviderWithoutOllama } from "@/lib/user-keys" +import { generateUUID } from "@/lib/utils" import { Attachment } from "@ai-sdk/ui-utils" -import { Message as MessageAISDK, streamText, ToolSet } from "ai" +import { + createDataStream, + Message as MessageAISDK, + streamText, + ToolSet, +} from "ai" +import { after, NextResponse } from "next/server" +import { createResumableStreamContext } from "resumable-stream" import { incrementMessageCount, logUserMessage, storeAssistantMessage, validateAndTrackUsage, } from "./api" +import { createStreamId } from "./db" import { createErrorResponse, extractErrorMessage } from "./utils" export const maxDuration = 60 +const streamContext = createResumableStreamContext({ + waitUntil: after, +}) + type ChatRequest = { messages: MessageAISDK[] chatId: string @@ -89,39 +104,51 @@ export async function POST(req: Request) { undefined } - const result = streamText({ - model: modelConfig.apiSdk(apiKey, { enableSearch }), - system: effectiveSystemPrompt, - messages: messages, - tools: {} as ToolSet, - maxSteps: 10, - onError: (err: unknown) => { - console.error("Streaming error occurred:", err) - // Don't set streamError anymore - let the AI SDK handle it through the stream - }, + const streamId = generateUUID() + await createStreamId(supabase!, { streamId, chatId }) - onFinish: async ({ response }) => { - if (supabase) { - await storeAssistantMessage({ - supabase, - chatId, - messages: - response.messages as unknown as import("@/app/types/api.types").Message[], - message_group_id, - model, - }) - } - }, - }) + const stream = createDataStream({ + execute: (dataStream) => { + const result = streamText({ + model: modelConfig.apiSdk!(apiKey, { enableSearch }), + system: effectiveSystemPrompt, + messages: messages, + tools: {} as ToolSet, + maxSteps: 10, + onError: (err: unknown) => { + console.error("Streaming error occurred:", err) + // Don't set streamError anymore - let the AI SDK handle it through the stream + }, - return result.toDataStreamResponse({ - sendReasoning: true, - sendSources: true, - getErrorMessage: (error: unknown) => { - console.error("Error forwarded to client:", error) - return extractErrorMessage(error) + onFinish: async ({ response }) => { + if (supabase) { + await storeAssistantMessage({ + supabase, + chatId, + messages: + response.messages as unknown as import("@/app/types/api.types").Message[], + message_group_id, + model, + }) + } + }, + }) + result.consumeStream() + + result.mergeIntoDataStream(dataStream, { + sendReasoning: true, + sendSources: true, + }) + }, + onError: (err: unknown) => { + console.error("Error forwarded to client:", err) + return extractErrorMessage(err) }, }) + + return new Response( + await streamContext.resumableStream(streamId, () => stream) + ) } catch (err: unknown) { console.error("Error in /api/chat:", err) const error = err as { @@ -133,3 +160,105 @@ export async function POST(req: Request) { return createErrorResponse(error) } } + +export async function GET(req: Request) { + try { + const { searchParams } = new URL(req.url) + const chatId = searchParams.get("chatId") + + if (!chatId) { + return new Response("id is required", { status: 400 }) + } + const supabase = await createClient() + + if (!supabase) { + return NextResponse.json( + { error: "Database connection failed" }, + { status: 500 } + ) + } + + // Get the current user + const { + data: { user }, + error: authError, + } = await supabase.auth.getUser() + + if (authError || !user) { + return NextResponse.json({ error: "Unauthorized" }, { status: 401 }) + } + + // Get the chat by id + const chat = await supabase + .from("chats") + .select("*") + .eq("id", chatId) + .single() + + if (!chat.data) { + return new Response("Not found", { status: 404 }) + } + + if (chat.data.user_id !== user.id) { + return new Response("Forbidden", { status: 403 }) + } + + // Get the stream ids by chat id + const streamIds = await supabase + .from("stream_ids") + .select("*") + .eq("chat_id", chatId) + + if (!streamIds.data) { + return new Response("No streams found", { status: 404 }) + } + + const recentStreamId = streamIds.data.at(-1)?.stream_id + + if (!recentStreamId) { + return new Response("No recent stream found", { status: 404 }) + } + + const emptyDataStream = createDataStream({ + execute: () => {}, + }) + + const stream = await streamContext.resumableStream( + recentStreamId, + () => emptyDataStream + ) + + if (stream) { + return new Response(stream, { status: 200 }) + } + + /* + * For when the generation is "active" during SSR but the + * resumable stream has concluded after reaching this point. + */ + + const messages = await getMessagesFromDb(chatId) + const mostRecentMessage = messages.at(-1) + + if (!mostRecentMessage || mostRecentMessage.role !== "assistant") { + return new Response(emptyDataStream, { status: 200 }) + } + + const streamWithMessage = createDataStream({ + execute: (buffer) => { + buffer.writeData({ + type: "append-message", + message: JSON.stringify(mostRecentMessage), + }) + }, + }) + + return new Response(streamWithMessage, { status: 200 }) + } catch (error) { + console.error("Error in chat GET API:", error) + return NextResponse.json( + { error: "Internal server error" }, + { status: 500 } + ) + } +} diff --git a/app/c/[chatId]/page.tsx b/app/c/[chatId]/page.tsx index dfcf95b4..ac25ab7b 100644 --- a/app/c/[chatId]/page.tsx +++ b/app/c/[chatId]/page.tsx @@ -1,11 +1,19 @@ import { ChatContainer } from "@/app/components/chat/chat-container" import { LayoutApp } from "@/app/components/layout/layout-app" +import { getMessagesFromDb } from "@/lib/chat-store/messages/api" import { MessagesProvider } from "@/lib/chat-store/messages/provider" import { isSupabaseEnabled } from "@/lib/supabase/config" import { createClient } from "@/lib/supabase/server" +import { Message as MessageAISDK } from "ai" import { redirect } from "next/navigation" -export default async function Page() { +export default async function Page({ + params, +}: { + params: Promise<{ chatId: string }> +}) { + let initialMessages: MessageAISDK[] = [] + const { chatId } = await params if (isSupabaseEnabled) { const supabase = await createClient() if (supabase) { @@ -13,13 +21,14 @@ export default async function Page() { if (userError || !userData?.user) { redirect("/") } + initialMessages = await getMessagesFromDb(chatId) } } return ( - + ) diff --git a/app/components/chat/chat-container.tsx b/app/components/chat/chat-container.tsx index 7d6ed679..3453ed45 100644 --- a/app/components/chat/chat-container.tsx +++ b/app/components/chat/chat-container.tsx @@ -2,9 +2,16 @@ import { MultiChat } from "@/app/components/multi-chat/multi-chat" import { useUserPreferences } from "@/lib/user-preference-store/provider" +import type { Message as MessageAISDK } from "ai" import { Chat } from "./chat" -export function ChatContainer() { +export function ChatContainer({ + initialMessages, + autoResume, +}: { + initialMessages?: MessageAISDK[] + autoResume?: boolean +}) { const { preferences } = useUserPreferences() const multiModelEnabled = preferences.multiModelEnabled @@ -12,5 +19,5 @@ export function ChatContainer() { return } - return + return } diff --git a/app/components/chat/chat.tsx b/app/components/chat/chat.tsx index 182b683f..d62fd6a8 100644 --- a/app/components/chat/chat.tsx +++ b/app/components/chat/chat.tsx @@ -11,6 +11,7 @@ import { SYSTEM_PROMPT_DEFAULT } from "@/lib/config" import { useUserPreferences } from "@/lib/user-preference-store/provider" import { useUser } from "@/lib/user-store/provider" import { cn } from "@/lib/utils" +import type { Message as MessageAISDK } from "ai" import { AnimatePresence, motion } from "motion/react" import dynamic from "next/dynamic" import { redirect } from "next/navigation" @@ -29,7 +30,13 @@ const DialogAuth = dynamic( { ssr: false } ) -export function Chat() { +export function Chat({ + initialMessages: InitialMessagesFromDB, + autoResume, +}: { + autoResume?: boolean + initialMessages?: MessageAISDK[] +}) { const { chatId } = useChatSession() const { createNewChat, @@ -105,7 +112,7 @@ export function Chat() { handleReload, handleInputChange, } = useChatCore({ - initialMessages, + initialMessages: InitialMessagesFromDB || [], draftValue, cacheAndAddMessage, chatId, @@ -120,6 +127,7 @@ export function Chat() { selectedModel, clearDraft, bumpChat, + autoResume, }) // Memoize the conversation props to prevent unnecessary rerenders diff --git a/app/components/chat/use-chat-core.ts b/app/components/chat/use-chat-core.ts index 4799af17..4d54d7c2 100644 --- a/app/components/chat/use-chat-core.ts +++ b/app/components/chat/use-chat-core.ts @@ -1,3 +1,4 @@ +import { useAutoResume } from "@/app/hooks/use-auto-resume" import { useChatDraft } from "@/app/hooks/use-chat-draft" import { toast } from "@/components/ui/toast" import { getOrCreateGuestUserId } from "@/lib/api" @@ -31,6 +32,7 @@ type UseChatCoreProps = { selectedModel: string clearDraft: () => void bumpChat: (chatId: string) => void + autoResume?: boolean } export function useChatCore({ @@ -49,6 +51,7 @@ export function useChatCore({ selectedModel, clearDraft, bumpChat, + autoResume, }: UseChatCoreProps) { // State management const [isSubmitting, setIsSubmitting] = useState(false) @@ -96,7 +99,10 @@ export function useChatCore({ setMessages, setInput, append, + experimental_resume, + data, } = useChat({ + id: chatId!, api: API_ROUTE_CHAT, initialMessages, initialInput: draftValue, @@ -104,6 +110,14 @@ export function useChatCore({ onError: handleError, }) + useAutoResume({ + autoResume, + initialMessages, + experimental_resume, + data, + setMessages, + }) + // Handle search params on mount useEffect(() => { if (prompt && typeof window !== "undefined") { diff --git a/app/hooks/use-auto-resume.ts b/app/hooks/use-auto-resume.ts new file mode 100644 index 00000000..67baf33b --- /dev/null +++ b/app/hooks/use-auto-resume.ts @@ -0,0 +1,47 @@ +"use client" + +import type { UseChatHelpers } from "@ai-sdk/react" +import type { Message } from "ai" +import { useEffect } from "react" + +export type DataPart = { type: "append-message"; message: string } + +export interface Props { + autoResume?: boolean + initialMessages: Message[] + experimental_resume: UseChatHelpers["experimental_resume"] + data: UseChatHelpers["data"] + setMessages: UseChatHelpers["setMessages"] +} + +export function useAutoResume({ + autoResume = false, + initialMessages, + experimental_resume, + data, + setMessages, +}: Props) { + useEffect(() => { + if (!autoResume) return + + const mostRecentMessage = initialMessages.at(-1) + + if (mostRecentMessage?.role === "user") { + experimental_resume() + } + + // we intentionally run this once + // eslint-disable-next-line react-hooks/exhaustive-deps + }, []) + + useEffect(() => { + if (!data || data.length === 0) return + + const dataPart = data[0] as DataPart + + if (dataPart.type === "append-message") { + const message = JSON.parse(dataPart.message) as Message + setMessages([...initialMessages, message]) + } + }, [data, initialMessages, setMessages]) +} diff --git a/app/types/database.types.ts b/app/types/database.types.ts index 8c7d112f..1d041099 100644 --- a/app/types/database.types.ts +++ b/app/types/database.types.ts @@ -136,6 +136,35 @@ export type Database = { }, ] } + stream_ids: { + Row: { + id: string + stream_id: string + chat_id: string + created_at: string | null + } + Insert: { + id?: string + stream_id: string + chat_id: string + created_at?: string | null + } + Update: { + id?: string + stream_id?: string + chat_id?: string + created_at?: string | null + } + Relationships: [ + { + foreignKeyName: "stream_ids_chat_id_fkey" + columns: ["chat_id"] + isOneToOne: false + referencedRelation: "chats" + referencedColumns: ["id"] + }, + ] + } messages: { Row: { experimental_attachments: Attachment[] diff --git a/lib/utils.ts b/lib/utils.ts index 2d486a3e..44bb4fd2 100644 --- a/lib/utils.ts +++ b/lib/utils.ts @@ -35,3 +35,23 @@ export function debounce any>( } export const isDev = process.env.NODE_ENV === "development" + +/** + * Generates a UUID (Universally Unique Identifier) version 4. + * A UUID v4 is a 128-bit number used to uniquely identify information in computer systems. + * This version generates random UUIDs following RFC 4122. + * + * @returns {string} A string representing a UUID v4, in the format xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx, + * where 'x' is any hexadecimal digit and 'y' is one of 8, 9, A, or B. + * + * @example + * const uuid = generateUUID(); + * console.log(uuid); // e.g. '3b12f1df-5232-4e9f-bb6d-6d5c6ec9f53a' + */ +export function generateUUID(): string { + return "xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx".replace(/[xy]/g, (c) => { + const r = crypto.getRandomValues(new Uint8Array(1))[0] % 16 + const v = c === "x" ? r : (r & 0x3) | 0x8 + return v.toString(16) + }) +} diff --git a/package-lock.json b/package-lock.json index 5e1193fc..38041862 100644 --- a/package-lock.json +++ b/package-lock.json @@ -52,8 +52,10 @@ "react": "^19.0.0", "react-dom": "^19.0.0", "react-markdown": "^10.1.0", + "redis": "^5.6.0", "remark-breaks": "^4.0.0", "remark-gfm": "^4.0.1", + "resumable-stream": "^2.2.1", "shiki": "^3.4.0", "slugify": "^1.6.6", "sonner": "^2.0.1", @@ -2614,6 +2616,66 @@ "integrity": "sha512-HPwpGIzkl28mWyZqG52jiqDJ12waP11Pa1lGoiyUkIEuMLBP0oeK/C89esbXrxsky5we7dfd8U58nm0SgAWpVw==", "license": "MIT" }, + "node_modules/@redis/bloom": { + "version": "5.6.0", + "resolved": "https://registry.npmjs.org/@redis/bloom/-/bloom-5.6.0.tgz", + "integrity": "sha512-l13/d6BaZDJzogzZJEphIeZ8J0hpQpjkMiozomTm6nJiMNYkoPsNOBOOQua4QsG0fFjyPmLMDJFPAp5FBQtTXg==", + "license": "MIT", + "engines": { + "node": ">= 18" + }, + "peerDependencies": { + "@redis/client": "^5.6.0" + } + }, + "node_modules/@redis/client": { + "version": "5.6.0", + "resolved": "https://registry.npmjs.org/@redis/client/-/client-5.6.0.tgz", + "integrity": "sha512-wmP9kCFElCSr4MM4+1E4VckDuN4wLtiXSM/J0rKVQppajxQhowci89RGZr2OdLualowb8SRJ/R6OjsXrn9ZNFA==", + "license": "MIT", + "dependencies": { + "cluster-key-slot": "1.1.2" + }, + "engines": { + "node": ">= 18" + } + }, + "node_modules/@redis/json": { + "version": "5.6.0", + "resolved": "https://registry.npmjs.org/@redis/json/-/json-5.6.0.tgz", + "integrity": "sha512-YQN9ZqaSDpdLfJqwzcF4WeuJMGru/h4WsV7GeeNtXsSeyQjHTyDxrd48xXfRRJGv7HitA7zGnzdHplNeKOgrZA==", + "license": "MIT", + "engines": { + "node": ">= 18" + }, + "peerDependencies": { + "@redis/client": "^5.6.0" + } + }, + "node_modules/@redis/search": { + "version": "5.6.0", + "resolved": "https://registry.npmjs.org/@redis/search/-/search-5.6.0.tgz", + "integrity": "sha512-sLgQl92EyMVNHtri5K8Q0j2xt9c0cO9HYurXz667Un4xeUYR+B/Dw5lLG35yqO7VvVxb9amHJo9sAWumkKZYwA==", + "license": "MIT", + "engines": { + "node": ">= 18" + }, + "peerDependencies": { + "@redis/client": "^5.6.0" + } + }, + "node_modules/@redis/time-series": { + "version": "5.6.0", + "resolved": "https://registry.npmjs.org/@redis/time-series/-/time-series-5.6.0.tgz", + "integrity": "sha512-tXABmN1vu4aTNL3WI4Iolpvx/5jgil2Bs31ozvKblT+jkUoRkk8ykmYo9Pv/Mp7Gk6/Qkr/2rMgVminrt/4BBQ==", + "license": "MIT", + "engines": { + "node": ">= 18" + }, + "peerDependencies": { + "@redis/client": "^5.6.0" + } + }, "node_modules/@rtsao/scc": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/@rtsao/scc/-/scc-1.1.0.tgz", @@ -4407,6 +4469,15 @@ "node": ">=6" } }, + "node_modules/cluster-key-slot": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/cluster-key-slot/-/cluster-key-slot-1.1.2.tgz", + "integrity": "sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==", + "license": "Apache-2.0", + "engines": { + "node": ">=0.10.0" + } + }, "node_modules/cmdk": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/cmdk/-/cmdk-1.1.1.tgz", @@ -9194,6 +9265,22 @@ } } }, + "node_modules/redis": { + "version": "5.6.0", + "resolved": "https://registry.npmjs.org/redis/-/redis-5.6.0.tgz", + "integrity": "sha512-0x3pM3SlYA5azdNwO8qgfMBzoOqSqr9M+sd1hojbcn0ZDM5zsmKeMM+zpTp6LIY+mbQomIc/RTTQKuBzr8QKzQ==", + "license": "MIT", + "dependencies": { + "@redis/bloom": "5.6.0", + "@redis/client": "5.6.0", + "@redis/json": "5.6.0", + "@redis/search": "5.6.0", + "@redis/time-series": "5.6.0" + }, + "engines": { + "node": ">= 18" + } + }, "node_modules/reflect.getprototypeof": { "version": "1.0.10", "resolved": "https://registry.npmjs.org/reflect.getprototypeof/-/reflect.getprototypeof-1.0.10.tgz", @@ -9384,6 +9471,12 @@ "url": "https://github.com/privatenumber/resolve-pkg-maps?sponsor=1" } }, + "node_modules/resumable-stream": { + "version": "2.2.1", + "resolved": "https://registry.npmjs.org/resumable-stream/-/resumable-stream-2.2.1.tgz", + "integrity": "sha512-8UOWqA+KjAOaZeeZa38lIhbNGM1B6ygO2QTi1BNcVO9vemQz1M0f2qVsnHqSnsLpJLfmOWNwq64TmBtOJxjCeQ==", + "license": "ISC" + }, "node_modules/reusify": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/reusify/-/reusify-1.1.0.tgz", diff --git a/package.json b/package.json index 76cde6be..3f890ccc 100644 --- a/package.json +++ b/package.json @@ -54,8 +54,10 @@ "react": "^19.0.0", "react-dom": "^19.0.0", "react-markdown": "^10.1.0", + "redis": "^5.6.0", "remark-breaks": "^4.0.0", "remark-gfm": "^4.0.1", + "resumable-stream": "^2.2.1", "shiki": "^3.4.0", "slugify": "^1.6.6", "sonner": "^2.0.1",