Skip to content

Commit 031a3ab

Browse files
authored
Merge pull request #41 from devchat-ai/hanle-workflows-errors
Hanle workflows errors
2 parents 44061e0 + 3e39ed0 commit 031a3ab

7 files changed

Lines changed: 114 additions & 71 deletions

File tree

src/main/kotlin/ai/devchat/cli/DevChatWrapper.kt

Lines changed: 78 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ import com.alibaba.fastjson.JSONArray
1010
import com.alibaba.fastjson.JSONObject
1111
import com.intellij.util.containers.addIfNotNull
1212
import kotlinx.coroutines.*
13+
import kotlinx.coroutines.channels.SendChannel
14+
import kotlinx.coroutines.channels.actor
15+
import kotlinx.coroutines.selects.select
16+
import kotlinx.coroutines.selects.whileSelect
1317
import java.io.File
1418
import java.io.IOException
1519

@@ -33,16 +37,13 @@ suspend fun executeCommand(
3337
command: List<String>,
3438
workDir: String?,
3539
env: Map<String, String>,
36-
onOutputLine: (String) -> Unit,
37-
onErrorLine: (String) -> Unit
38-
): Int {
40+
): Process {
3941
val processBuilder = ProcessBuilder(command)
4042
workDir?.let {processBuilder.directory(File(workDir))}
4143
env.forEach { (key, value) -> processBuilder.environment()[key] = value}
42-
val process = withContext(Dispatchers.IO) {
44+
return withContext(Dispatchers.IO) {
4345
processBuilder.start()
4446
}
45-
return process.await(onOutputLine, onErrorLine)
4647
}
4748

4849
class Command(val cmd: MutableList<String> = mutableListOf()) {
@@ -62,34 +63,38 @@ class Command(val cmd: MutableList<String> = mutableListOf()) {
6263
return this
6364
}
6465

65-
fun exec(
66-
flags: List<Pair<String, String?>> = listOf(),
67-
callback: ((String) -> Unit)? = null,
68-
onFinish: ((Int) -> Unit)? = null
69-
): String? {
66+
private fun prepare(flags: List<Pair<String, String?>> = listOf()): List<String> {
7067
val args = flags.fold(mutableListOf<String>()) { acc, (name, value) ->
7168
acc.add("--$name")
7269
acc.addIfNotNull(value)
7370
acc
7471
}
75-
return try {
76-
callback?.let {
77-
execAsync(cmd + args, callback, DevChatNotifier::stickyError, onFinish);
78-
""
79-
} ?: exec(cmd + args)
80-
} catch (e: Exception) {
81-
Log.warn("Failed to run command $cmd: ${e.message}")
82-
throw CommandExecutionException("Failed to run command $cmd: ${e.message}")
83-
}
72+
return cmd + args
8473
}
8574

86-
private fun exec(commands: List<String>): String {
87-
Log.info("Executing command: ${commands.joinToString(" ")}}")
75+
private fun toString(flags: List<Pair<String, String?>>): String {
76+
val preparedCommand = prepare(flags)
77+
return env.entries.joinToString(" ") { (k, v) ->
78+
val masked = if (k == "OPENAI_API_KEY") v.mapIndexed { i, c ->
79+
if (i in 7 until v.length - 7) '*' else c
80+
}.joinToString("") else v
81+
"$k=$masked"
82+
} + " " + preparedCommand.joinToString(" ")
83+
}
84+
85+
fun exec(flags: List<Pair<String, String?>> = listOf()): String {
86+
val preparedCommand = prepare(flags)
87+
val commandStr = toString(flags)
88+
Log.info("Executing command: $commandStr")
8889
return try {
8990
val outputLines: MutableList<String> = mutableListOf()
9091
val errorLines: MutableList<String> = mutableListOf()
9192
val exitCode = runBlocking {
92-
executeCommand(commands, ProjectUtils.project?.basePath, env, outputLines::add, errorLines::add)
93+
executeCommand(
94+
preparedCommand,
95+
ProjectUtils.project?.basePath,
96+
env
97+
).await(outputLines::add, errorLines::add)
9398
}
9499
val errors = errorLines.joinToString("\n")
95100

@@ -98,28 +103,47 @@ class Command(val cmd: MutableList<String> = mutableListOf()) {
98103
} else {
99104
outputLines.joinToString("\n")
100105
}
101-
} catch (e: IOException) {
102-
Log.warn("Failed to execute command: $commands, Exception: $e")
103-
throw e
106+
} catch (e: Exception) {
107+
val msg = "Failed to execute command `$commandStr`: $e"
108+
Log.warn(msg)
109+
throw CommandExecutionException(msg)
104110
}
105111
}
106112

107-
private fun execAsync(
108-
commands: List<String>,
113+
@OptIn(ObsoleteCoroutinesApi::class, ExperimentalCoroutinesApi::class)
114+
fun execAsync(
115+
flags: List<Pair<String, String?>>,
109116
onOutput: (String) -> Unit,
110117
onError: (String) -> Unit = Log::warn,
111118
onFinish: ((Int) -> Unit)? = null,
112-
): Job {
113-
Log.info("Executing command: ${commands.joinToString(" ")}}")
119+
): SendChannel<String> {
120+
val preparedCommand = prepare(flags)
121+
val commandStr = toString(flags)
122+
Log.info("Executing command: $commandStr")
114123
val exceptionHandler = CoroutineExceptionHandler { _, exception ->
115-
val msg = "Failed to execute command: $commands, Exception: $exception"
124+
val msg = "Failed to execute command `$commandStr`: $exception"
116125
Log.warn(msg)
117126
onError(msg)
118127
}
119-
val cmdScope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
120-
121-
return cmdScope.launch(exceptionHandler) {
122-
val exitCode = executeCommand(commands, ProjectUtils.project?.basePath, env, onOutput, onError)
128+
return CoroutineScope(
129+
SupervisorJob() + Dispatchers.Default + exceptionHandler
130+
).actor {
131+
val process = executeCommand(preparedCommand, ProjectUtils.project?.basePath, env)
132+
val deferred = async {process.await(onOutput, onError)}
133+
var exitCode = 0
134+
whileSelect {
135+
deferred.onAwait {
136+
exitCode = it
137+
false
138+
}
139+
channel.onReceive {msg ->
140+
process.outputStream.use {
141+
it.write(msg.toByteArray())
142+
}
143+
Log.info("Input wrote: $msg")
144+
true
145+
}
146+
}
123147
onFinish?.let { onFinish(exitCode) }
124148
if (exitCode != 0) {
125149
throw CommandExecutionException("Command failure with exit Code: $exitCode")
@@ -156,49 +180,47 @@ class DevChatWrapper(
156180
Log.info("api_key: ${it.substring(0, 5)}...${it.substring(it.length - 4)}")
157181
}
158182
env["PYTHONPATH"] = PathUtils.pythonPath
183+
env["command_python"] = PathUtils.pythonForWorkflows
159184
return env
160185
}
161186

162-
val runCmd = Command(baseCommand).subcommand("run")::exec
163-
val logCmd = Command(baseCommand).subcommand("log")::exec
164-
val topicCmd = Command(baseCommand).subcommand("topic")::exec
165-
val routeCmd = Command(baseCommand).subcommand("route")::exec
166-
167-
val run get() = { flags: List<Pair<String, String?>> -> runCmd(flags, null, null)}
168-
val log get() = { flags: List<Pair<String, String?>> -> logCmd(flags, null, null)}
169-
val topic get() = { flags: List<Pair<String, String?>> -> topicCmd(flags, null, null)}
187+
val run = Command(baseCommand).subcommand("run")::exec
188+
val log = Command(baseCommand).subcommand("log")::exec
189+
val topic = Command(baseCommand).subcommand("topic")::exec
190+
val routeCmd = Command(baseCommand).subcommand("route")::execAsync
170191

171192
fun route(
172193
flags: List<Pair<String, String?>>,
173194
message: String,
174-
callback: ((String) -> Unit)?,
175-
onFinish: ((Int) -> Unit)?
195+
callback: (String) -> Unit,
196+
onError: (String) -> Unit = DevChatNotifier::stickyError,
197+
onFinish: ((Int) -> Unit)? = null
176198
) {
177199
when {
178200
apiKey.isNullOrEmpty() -> DevChatNotifier.stickyError("Please config your API key first.")
179201
!apiKey!!.startsWith("DC.") -> DevChatNotifier.stickyError("Invalid API key format.")
180-
else -> routeCmd(
202+
else -> activeChannel = routeCmd(
181203
flags
182204
+ (if (flags.any {
183205
it.first == "model" && !it.second.isNullOrEmpty()
184206
}) emptyList() else listOf("model" to defaultModel))
185207
+ listOf("" to message),
186208
callback,
209+
onError,
187210
onFinish
188211
)
189212
}
190213
}
191214

192215
val topicList: JSONArray get() = try {
193-
val r = topic(mutableListOf("list" to null)) ?: "[]"
216+
val r = topic(mutableListOf("list" to null))
194217
JSON.parseArray(r)
195218
} catch (e: Exception) {
196219
Log.warn("Error list topics: $e")
197220
JSONArray()
198221
}
199222
val commandList: JSONArray get() = try {
200-
val r = run(mutableListOf("list" to null)) ?: "[]"
201-
JSON.parseArray(r)
223+
JSON.parseArray(run(mutableListOf("list" to null)))
202224
} catch (e: Exception) {
203225
Log.warn("Error list commands: $e")
204226
JSONArray()
@@ -207,11 +229,10 @@ class DevChatWrapper(
207229
val logTopic: (String, Int?) -> JSONArray get() = {topic: String, maxCount: Int? ->
208230
val num: Int = maxCount ?: DEFAULT_LOG_MAX_COUNT
209231
try {
210-
val r = log(mutableListOf(
232+
JSON.parseArray(log(mutableListOf(
211233
"topic" to topic,
212234
"max-count" to num.toString()
213-
)) ?: "[]"
214-
JSON.parseArray(r)
235+
)))
215236
} catch (e: Exception) {
216237
Log.warn("Error log topic: $e")
217238
JSONArray()
@@ -221,22 +242,24 @@ class DevChatWrapper(
221242
try {
222243
log(listOf("insert" to item))
223244
} catch (e: Exception) {
224-
val msg = "Error insert log: $e"
225-
Log.warn(msg)
226-
DevChatNotifier.error(msg)
245+
Log.warn("Error insert log: $e")
227246
}
228247
}
229248

230249
val logLast: () -> JSONObject? get() = {
231250
try {
232251
log(mutableListOf(
233252
"max-count" to "1"
234-
))?.let {
253+
)).let {
235254
JSON.parseArray(it).getJSONObject(0)
236255
}
237256
} catch (e: Exception) {
238257
Log.warn("Error log topic: $e")
239258
null
240259
}
241260
}
261+
companion object {
262+
var activeChannel: SendChannel<String>? = null
263+
}
264+
242265
}

src/main/kotlin/ai/devchat/common/PathUtils.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import java.nio.file.attribute.BasicFileAttributes
77
object PathUtils {
88
val workPath: String = Paths.get(System.getProperty("user.home"), ".chat").toString()
99
var pythonCommand: String = "python"
10+
var pythonForWorkflows: String = "python"
1011
val pythonPath: String = Paths.get(workPath, "site-packages").toString()
1112

1213
fun copyResourceDirToPath(resourceDir: String, outputPath: String) {

src/main/kotlin/ai/devchat/devchat/ActionHandlerFactory.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ class ActionHandlerFactory {
2525
DevChatActions.COMMIT_CODE_REQUEST to CommitCodeRequestHandler::class,
2626
DevChatActions.GET_SETTING_REQUEST to GetSettingRequestHandler::class,
2727
DevChatActions.UPDATE_SETTING_REQUEST to UpdateSettingRequestHandler::class,
28+
DevChatActions.INPUT_REQUEST to InputRequestHandler::class,
2829
DevChatActions.SHOW_SETTING_DIALOG_REQUEST to ShowSettingDialogRequestHandler::class,
2930
DevChatActions.DELETE_LAST_CONVERSATION_REQUEST to DeleteLastConversationRequestHandler::class,
3031
DevChatActions.DELETE_TOPIC_REQUEST to DeleteTopicRequestHandler::class,

src/main/kotlin/ai/devchat/devchat/DevChatActions.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ object DevChatActions {
3838
const val GET_SETTING_RESPONSE = "getSetting/response"
3939
const val UPDATE_SETTING_REQUEST = "updateSetting/request"
4040
const val UPDATE_SETTING_RESPONSE = "updateSetting/response"
41+
const val INPUT_REQUEST = "input/request"
42+
const val INPUT_RESPONSE = "input/response"
4143
const val SHOW_SETTING_DIALOG_REQUEST = "showSettingDialog/request"
4244
const val DELETE_LAST_CONVERSATION_REQUEST = "deleteLastConversation/request"
4345
const val DELETE_LAST_CONVERSATION_RESPONSE = "deleteLastConversation/response"
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package ai.devchat.devchat.handler
2+
3+
import ai.devchat.cli.DevChatWrapper
4+
import ai.devchat.devchat.BaseActionHandler
5+
import ai.devchat.devchat.DevChatActions
6+
import com.alibaba.fastjson.JSONObject
7+
import kotlinx.coroutines.runBlocking
8+
9+
class InputRequestHandler(metadata: JSONObject?, payload: JSONObject?) : BaseActionHandler(metadata, payload) {
10+
override val actionName: String = DevChatActions.INPUT_RESPONSE
11+
12+
override fun action() {
13+
runBlocking {
14+
DevChatWrapper.activeChannel?.send(payload!!.getString("data"))
15+
}
16+
send()
17+
}
18+
}

src/main/kotlin/ai/devchat/devchat/handler/SendMessageRequestHandler.kt

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -64,20 +64,13 @@ class SendMessageRequestHandler(metadata: JSONObject?, payload: JSONObject?) : B
6464
wrapper.route(
6565
flags,
6666
message,
67-
{line ->
67+
callback = {line ->
6868
response.update(line)
6969
promptCallback(response)
7070
},
71-
{ _ ->
72-
insertLog(
73-
contextJSONs,
74-
model.takeIf { it.isNotEmpty() } ?: DevChatSettingsState.instance.defaultModel,
75-
message,
76-
response.message ?: "",
77-
parent
78-
)
79-
val lastRecord = wrapper.logLast()
80-
response.update("prompt ${lastRecord!!["hash"]}")
71+
onFinish = { _ ->
72+
val record = insertLog(contextJSONs, model, message, response.message, parent)
73+
response.update("prompt ${record["hash"]}")
8174
promptCallback(response)
8275

8376
val currentTopic = ActiveConversation.topic ?: response.promptHash!!
@@ -136,13 +129,14 @@ class SendMessageRequestHandler(metadata: JSONObject?, payload: JSONObject?) : B
136129

137130
private fun insertLog(
138131
contexts: List<String>?,
139-
model: String,
132+
model: String?,
140133
request: String,
141-
response: String,
134+
response: String?,
142135
parent: String?
143-
) {
144-
val item = mutableMapOf<String, Any?>(
145-
"model" to model,
136+
): JSONObject {
137+
val defaultModel = DevChatSettingsState.instance.defaultModel
138+
val item = mutableMapOf(
139+
"model" to if (model.isNullOrEmpty()) defaultModel else model,
146140
"messages" to listOf(
147141
mutableMapOf(
148142
"role" to "user",
@@ -163,6 +157,9 @@ class SendMessageRequestHandler(metadata: JSONObject?, payload: JSONObject?) : B
163157
)
164158
parent?.let {item.put("parent", parent)}
165159
wrapper.logInsert(JSONObject(item).toJSONString())
160+
val lastRecord = wrapper.logLast()
161+
Log.info("Log item inserted: ${lastRecord!!["hash"]}")
162+
return lastRecord
166163
}
167164

168165
}

src/main/kotlin/ai/devchat/idea/DevChatSetupThread.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ class DevChatSetupThread : Thread() {
5858
?.let {
5959
val workflowEnv = envManager.createEnv("devchat-commands", defaultPythonVersion)
6060
workflowEnv.installRequirements(it)
61+
PathUtils.pythonForWorkflows = workflowEnv.pythonCommand
6162
}
6263
}
6364

0 commit comments

Comments
 (0)