Skip to content

Commit e990ce9

Browse files
committed
MB-70633 Implement conversational support...
... for USING AI statement. The changes propose to enhance the existing natural language request processing from one shot requests, i.e plain USING AI statements to conversation history aware USING AI statements. Two new statements are introduced - 1. BEGIN CHAT required parameters: natural_context This statement will create a new chat session which can be used only by the user that made the request to create it. Every chat is uniquely identified with a chatid. The response of this request will include the chatid for the newly created chat session. 2. END CHAT required parameters: natural_chatid This statement searches for the chat with the chatid provided via natural_chatid query parameter and then deletes the chat session if the requesting user matches the chat user. USING AI requests when sent with natural_chatid indicate that the request is part of a chat session. The request's prompt that is sent to the llm api would include past messages, both from the user and the assistant. The response from the llm is saved in the history messages as context for any new request that may come in. Monitoring chat sessions: A new system keyspace, natural_chats stores information about on going chat sessions. The keyspace supports SELECT and DELETE. For this to work, admin/natural_chats index handler and chat handler are introduced. Before processing a chat for retrieval or deletion a check is made to ensure that the user matches the user that created the chat session. The current implementation only guarantees the availabilty of a chat session in the node where it was created. Chats are stored in memory using the GenCache struct. A guard of 2*numcpus limit on the number of ongoing chat sessions is imposed per node. For memory reasons, size the prompt for a chat USING AI request is limited to 1MiB. An error is returned once a chat's prompt out grows this limit. Change-Id: Ieeeb5db2148b5a1bb9e90514be67a57014787edc Reviewed-on: https://review.couchbase.org/c/query/+/227034 Reviewed-by: Sitaram Vemulapalli <sitaram.vemulapalli@couchbase.com> Reviewed-by: Bingjie Miao <bingjie.miao@couchbase.com> Tested-by: Gaurav J <gaurav.jayaraj@couchbase.com>
1 parent e6f9d4f commit e990ce9

12 files changed

Lines changed: 1184 additions & 29 deletions

datastore/system/system.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ const KEYSPACE_NAME_AUS = "aus"
6060
const KEYSPACE_NAME_AUS_SETTINGS = "aus_settings"
6161
const KEYSPACE_NAME_AWR = "awr"
6262
const KEYSPACE_NAME_SETTINGS = "settings"
63+
const KEYSPACE_NAME_NATURAL_CHAT = "natural_chats"
6364

6465
const PRIMARY_INDEX_NAME = "#primary"
6566

Lines changed: 369 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,369 @@
1+
// Copyright 2026-Present Couchbase, Inc.
2+
//
3+
// Use of this software is governed by the Business Source License included
4+
// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
5+
// in that file, in accordance with the Business Source License, use of this
6+
// software will be governed by the Apache License, Version 2.0, included in
7+
// the file licenses/APL2.txt.
8+
9+
package system
10+
11+
import (
12+
"github.com/couchbase/query/datastore"
13+
"github.com/couchbase/query/distributed"
14+
"github.com/couchbase/query/errors"
15+
"github.com/couchbase/query/expression"
16+
"github.com/couchbase/query/expression/parser"
17+
"github.com/couchbase/query/natural"
18+
"github.com/couchbase/query/timestamp"
19+
"github.com/couchbase/query/value"
20+
)
21+
22+
type naturalchatsKeyspace struct {
23+
keyspaceBase
24+
indexer datastore.Indexer
25+
}
26+
27+
func (b *naturalchatsKeyspace) Count(context datastore.QueryContext) (int64, errors.Error) {
28+
var count int64
29+
natural.ForEachConversation(func(chatId string, entry *natural.ChatEntry) bool {
30+
count++
31+
return true
32+
}, nil)
33+
return count, nil
34+
}
35+
36+
func (b *naturalchatsKeyspace) Fetch(keys []string, keysMap map[string]value.AnnotatedValue, context datastore.QueryContext,
37+
subPath []string, projection []string, useSubDoc bool) errors.Errors { // Bulk key-value fetch from this keyspace
38+
39+
var creds distributed.Creds
40+
41+
userName := datastore.CredsString(context.Credentials())
42+
if userName == "" {
43+
creds = distributed.NO_CREDS
44+
} else {
45+
creds = distributed.Creds(userName)
46+
}
47+
48+
whoamI := distributed.RemoteAccess().WhoAmI()
49+
for _, key := range keys {
50+
node, localkey := distributed.RemoteAccess().SplitKey(key)
51+
if node == whoamI {
52+
rv := natural.GetConversation(localkey)
53+
if rv == nil {
54+
continue // no such chat, possibly removed
55+
}
56+
ce := rv.(*natural.ChatEntry)
57+
if userName != "" && ce.User != userName {
58+
continue
59+
}
60+
itemMap := natural.FormatChatEntry(ce)
61+
itemMap["node"] = whoamI
62+
item := value.NewAnnotatedValue(itemMap)
63+
item.SetId(key)
64+
65+
keysMap[key] = item
66+
} else {
67+
distributed.RemoteAccess().GetRemoteDoc(node, localkey, "natural_chats", "GET",
68+
func(doc map[string]interface{}) {
69+
doc["node"] = node
70+
item := value.NewAnnotatedValue(doc)
71+
item.SetId(key)
72+
73+
keysMap[key] = item
74+
},
75+
func(warn errors.Error) {
76+
if !warn.HasCause(errors.W_SYSTEM_REMOTE_NODE_NOT_FOUND) {
77+
context.Warning(warn)
78+
}
79+
}, creds, "", nil)
80+
}
81+
82+
}
83+
return nil
84+
}
85+
86+
func (b *naturalchatsKeyspace) Delete(deletes value.Pairs, context datastore.QueryContext, preserveMutations bool) (
87+
int, value.Pairs, errors.Errors) {
88+
89+
var creds distributed.Creds
90+
91+
userName := credsFromContext(context)
92+
if userName == "" {
93+
creds = distributed.NO_CREDS
94+
} else {
95+
creds = distributed.Creds(userName)
96+
}
97+
98+
whoAmI := distributed.RemoteAccess().WhoAmI()
99+
var err errors.Error
100+
for i, pair := range deletes {
101+
name := pair.Name
102+
node, localKey := distributed.RemoteAccess().SplitKey(name)
103+
if node == whoAmI {
104+
c := natural.GetConversation(localKey)
105+
if c != nil {
106+
if ce, ok := c.(*natural.ChatEntry); ok {
107+
if userName != "" && ce.User != userName {
108+
continue
109+
}
110+
ce.Lock()
111+
if ce.Removed {
112+
// already removed, possibly by another routine.
113+
ce.Unlock()
114+
continue
115+
}
116+
natural.DeleteConversation(localKey)
117+
ce.Removed = true
118+
ce.Unlock()
119+
}
120+
} else {
121+
err = errors.NewNaturalLanguageRequestError(errors.E_NL_NO_SUCH_CHAT, localKey)
122+
}
123+
} else {
124+
// remote entry
125+
distributed.RemoteAccess().GetRemoteDoc(node, localKey, "natural_chats", "DELETE", nil,
126+
func(warn errors.Error) {
127+
if !warn.HasCause(errors.W_SYSTEM_REMOTE_NODE_NOT_FOUND) {
128+
context.Warning(warn)
129+
}
130+
},
131+
creds, "", nil)
132+
}
133+
134+
if err != nil {
135+
errs := errors.Errors{err}
136+
if preserveMutations {
137+
deleted := make([]value.Pair, i)
138+
if i > 0 {
139+
copy(deleted, deletes[0:i])
140+
}
141+
return i, deleted, errs
142+
} else {
143+
return i, nil, errs
144+
}
145+
}
146+
}
147+
148+
if preserveMutations {
149+
return len(deletes), deletes, nil
150+
} else {
151+
return len(deletes), nil, nil
152+
}
153+
}
154+
155+
func (b *naturalchatsKeyspace) Id() string {
156+
return b.name
157+
}
158+
159+
func (b *naturalchatsKeyspace) Indexer(name datastore.IndexType) (datastore.Indexer, errors.Error) {
160+
return b.indexer, nil
161+
}
162+
163+
func (b *naturalchatsKeyspace) Indexers() ([]datastore.Indexer, errors.Error) {
164+
return []datastore.Indexer{b.indexer}, nil
165+
}
166+
167+
func (b *naturalchatsKeyspace) Name() string {
168+
return b.name
169+
}
170+
171+
func (b *naturalchatsKeyspace) NamespaceId() string {
172+
return b.namespace.Id()
173+
}
174+
175+
func (b *naturalchatsKeyspace) Release(close bool) {
176+
}
177+
178+
func (b *naturalchatsKeyspace) Size(context datastore.QueryContext) (int64, errors.Error) {
179+
return -1, nil
180+
}
181+
182+
func NewNaturalChatsKeyspace(p *namespace) (*naturalchatsKeyspace, errors.Error) {
183+
b := new(naturalchatsKeyspace)
184+
setKeyspaceBase(&b.keyspaceBase, p, KEYSPACE_NAME_NATURAL_CHAT)
185+
186+
primary := &naturalChatIndex{
187+
name: PRIMARY_INDEX_NAME,
188+
primary: true,
189+
keyspace: b,
190+
}
191+
b.indexer = newSystemIndexer(b, primary)
192+
setIndexBase(&primary.indexBase, b.indexer)
193+
194+
expr, err := parser.Parse(`node`)
195+
196+
if err == nil {
197+
key := expression.Expressions{expr}
198+
nodes := &naturalChatIndex{
199+
name: "#nodes",
200+
keyspace: b,
201+
primary: false,
202+
idxKey: key,
203+
}
204+
setIndexBase(&nodes.indexBase, b.indexer)
205+
b.indexer.(*systemIndexer).AddIndex(nodes.name, nodes)
206+
} else {
207+
return nil, errors.NewSystemDatastoreError(err, "")
208+
}
209+
210+
return b, nil
211+
}
212+
213+
type naturalChatIndex struct {
214+
indexBase
215+
name string
216+
primary bool
217+
keyspace *naturalchatsKeyspace
218+
idxKey expression.Expressions
219+
}
220+
221+
func (pi *naturalChatIndex) Condition() expression.Expression {
222+
return nil
223+
}
224+
225+
func (pi *naturalChatIndex) Drop(requestId string) errors.Error {
226+
return errors.NewSystemIdxNoDropError(nil, "")
227+
}
228+
229+
func (pi *naturalChatIndex) Id() string {
230+
return pi.name
231+
}
232+
233+
func (pi *naturalChatIndex) IsPrimary() bool {
234+
return pi.primary
235+
}
236+
237+
func (pi *naturalChatIndex) KeyspaceId() string {
238+
return pi.keyspace.Id()
239+
}
240+
241+
func (pi *naturalChatIndex) Name() string {
242+
return pi.name
243+
}
244+
245+
func (pi *naturalChatIndex) RangeKey() expression.Expressions {
246+
return pi.idxKey
247+
}
248+
249+
func (pi *naturalChatIndex) Scan(requestId string, span *datastore.Span, distinct bool, limit int64,
250+
cons datastore.ScanConsistency, vector timestamp.Vector, conn *datastore.IndexConnection) {
251+
if span == nil || pi.primary {
252+
pi.ScanEntries(requestId, limit, cons, vector, conn)
253+
} else {
254+
255+
defer conn.Sender().Close()
256+
257+
var dsEntry *datastore.IndexEntry
258+
259+
whoAmI := distributed.RemoteAccess().WhoAmI()
260+
spanEvaluator, err := compileSpan(span)
261+
if err != nil {
262+
conn.Error(err)
263+
return
264+
}
265+
266+
process := func(chatId string, entry *natural.ChatEntry) bool {
267+
dsEntry = &datastore.IndexEntry{
268+
PrimaryKey: distributed.RemoteAccess().MakeKey(whoAmI, chatId),
269+
EntryKey: value.Values{value.NewValue(whoAmI)},
270+
}
271+
return true
272+
}
273+
274+
send := func() bool {
275+
return sendSystemKey(conn, dsEntry)
276+
}
277+
278+
idx := spanEvaluator.isEquals()
279+
if idx >= 0 {
280+
if spanEvaluator.key(idx) == whoAmI {
281+
natural.ForEachConversation(process, send)
282+
} else {
283+
nodes := []string{decodeNodeName(spanEvaluator.key(idx))}
284+
distributed.RemoteAccess().GetRemoteKeys(nodes, "natural_chats", func(id string) bool {
285+
n, _ := distributed.RemoteAccess().SplitKey(id)
286+
indexEntry := datastore.IndexEntry{
287+
PrimaryKey: id,
288+
EntryKey: value.Values{value.NewValue(n)},
289+
}
290+
return sendSystemKey(conn, &indexEntry)
291+
}, func(warn errors.Error) {
292+
if !warn.HasCause(errors.W_SYSTEM_REMOTE_NODE_NOT_FOUND) {
293+
conn.Warning(warn)
294+
}
295+
}, distributed.NO_CREDS, "")
296+
}
297+
} else {
298+
nodes := distributed.RemoteAccess().GetNodeNames()
299+
eligibleNodes := []string{}
300+
for _, node := range nodes {
301+
if spanEvaluator.evaluate(node) {
302+
if node == whoAmI {
303+
natural.ForEachConversation(process, send)
304+
} else {
305+
eligibleNodes = append(eligibleNodes, node)
306+
}
307+
}
308+
}
309+
310+
if len(eligibleNodes) > 0 {
311+
distributed.RemoteAccess().GetRemoteKeys(eligibleNodes, "natural_chats", func(id string) bool {
312+
n, _ := distributed.RemoteAccess().SplitKey(id)
313+
indexEntry := datastore.IndexEntry{
314+
PrimaryKey: id,
315+
EntryKey: value.Values{value.NewValue(n)},
316+
}
317+
return sendSystemKey(conn, &indexEntry)
318+
},
319+
func(warn errors.Error) {
320+
if !warn.HasCause(errors.W_SYSTEM_REMOTE_NODE_NOT_FOUND) {
321+
conn.Warning(warn)
322+
}
323+
}, distributed.NO_CREDS, "")
324+
}
325+
}
326+
}
327+
}
328+
329+
func (pi *naturalChatIndex) ScanEntries(requestId string, limit int64, cons datastore.ScanConsistency,
330+
vector timestamp.Vector, conn *datastore.IndexConnection) {
331+
332+
defer conn.Sender().Close()
333+
var entry *datastore.IndexEntry
334+
335+
whoAmI := distributed.RemoteAccess().WhoAmI()
336+
natural.ForEachConversation(
337+
func(chatId string, elem *natural.ChatEntry) bool {
338+
entry = &datastore.IndexEntry{PrimaryKey: distributed.RemoteAccess().MakeKey(whoAmI, chatId)}
339+
return true
340+
},
341+
func() bool {
342+
return sendSystemKey(conn, entry)
343+
})
344+
345+
distributed.RemoteAccess().GetRemoteKeys([]string{}, "natural_chats", func(id string) bool {
346+
indexEntry := datastore.IndexEntry{PrimaryKey: id}
347+
return sendSystemKey(conn, &indexEntry)
348+
}, func(warn errors.Error) {
349+
if !warn.HasCause(errors.W_SYSTEM_REMOTE_NODE_NOT_FOUND) {
350+
conn.Warning(warn)
351+
}
352+
}, distributed.NO_CREDS, "")
353+
}
354+
355+
func (pi *naturalChatIndex) SeekKey() expression.Expressions {
356+
return pi.idxKey
357+
}
358+
359+
func (pi *naturalChatIndex) State() (state datastore.IndexState, msg string, err errors.Error) {
360+
return datastore.ONLINE, "", nil
361+
}
362+
363+
func (pi *naturalChatIndex) Statistics(requestId string, span *datastore.Span) (datastore.Statistics, errors.Error) {
364+
return nil, nil
365+
}
366+
367+
func (pi *naturalChatIndex) Type() datastore.IndexType {
368+
return datastore.SYSTEM
369+
}

datastore/system/system_namespace.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,12 @@ func (p *namespace) loadKeyspaces() (e errors.Error) {
315315
}
316316
registerKeyspace(p, settings)
317317

318+
nlChat, e := NewNaturalChatsKeyspace(p)
319+
if e != nil {
320+
return e
321+
}
322+
registerKeyspace(p, nlChat)
323+
318324
return nil
319325
}
320326

0 commit comments

Comments
 (0)