Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ Columns: NAME, WEBHOOK ID, WORKFLOWS, URL

### Schedule Commands

> **Note:** Schedule commands are only available with Orkes Conductor (Enterprise).
> **Note:** Schedule commands work against both OSS Conductor and Orkes Conductor. The OSS server must include the `scheduler` module — the default jar from `conductor-oss/conductor` (used by `conductor server start`) ships it. Custom OSS builds that omit the module will return 404 with a hint message.

| Command | Description | Required Args | Optional Flags | Example |
|---------|-------------|---------------|----------------|---------|
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ conductor task <command> [arguments] [flags]

### Schedule Commands

Manage workflow schedules (Enterprise only).
Manage workflow schedules. Works against both OSS Conductor and Orkes Conductor (the OSS server must include the `scheduler` module — the default jar started by `conductor server start` does).

```
conductor schedule <command> [arguments] [flags]
Expand Down
88 changes: 35 additions & 53 deletions cmd/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,6 @@ var (
)

func listSchedules(cmd *cobra.Command, args []string) error {
if !isEnterpriseServer() {
return fmt.Errorf("Not supported in OSS Conductor")
}

schedulerClient := internal.GetSchedulerClient()
var workflowNameOpt optional.String
if len(args) == 1 {
Expand All @@ -132,7 +128,7 @@ func listSchedules(cmd *cobra.Command, args []string) error {
options := client.SchedulerResourceApiGetAllSchedulesOpts{WorkflowName: workflowNameOpt}
schedules, _, err := schedulerClient.GetAllSchedules(context.Background(), &options)
if err != nil {
return parseAPIError(err, "Failed to list schedules")
return parseSchedulerAPIError(err, "Failed to list schedules")
}

outputFormat, err := GetOutputFormat(cmd)
Expand Down Expand Up @@ -203,10 +199,6 @@ func listSchedules(cmd *cobra.Command, args []string) error {
}

func getSchedule(cmd *cobra.Command, args []string) error {
if !isEnterpriseServer() {
return fmt.Errorf("Not supported in OSS Conductor")
}

schedulerClient := internal.GetSchedulerClient()
if len(args) == 0 {
return cmd.Usage()
Expand All @@ -215,7 +207,7 @@ func getSchedule(cmd *cobra.Command, args []string) error {
for i := 0; i < len(args); i++ {
schedule, _, err := schedulerClient.GetSchedule(context.Background(), args[i])
if err != nil {
return parseAPIError(err, fmt.Sprintf("Failed to get schedule '%s'", args[i]))
return parseSchedulerAPIError(err, fmt.Sprintf("Failed to get schedule '%s'", args[i]))
}
bytes, _ := json.MarshalIndent(schedule, "", " ")
fmt.Println(string(bytes))
Expand All @@ -224,10 +216,6 @@ func getSchedule(cmd *cobra.Command, args []string) error {
}

func deleteSchedule(cmd *cobra.Command, args []string) error {
if !isEnterpriseServer() {
return fmt.Errorf("Not supported in OSS Conductor")
}

schedulerClient := internal.GetSchedulerClient()
if len(args) == 0 {
return cmd.Usage()
Expand All @@ -244,18 +232,14 @@ func deleteSchedule(cmd *cobra.Command, args []string) error {

_, _, err := schedulerClient.DeleteSchedule(context.Background(), name)
if err != nil {
return err
return parseSchedulerAPIError(err, fmt.Sprintf("Failed to delete schedule '%s'", name))
}
fmt.Printf("Schedule '%s' deleted successfully\n", name)
}
return nil
}

func pauseSchedule(cmd *cobra.Command, args []string) error {
if !isEnterpriseServer() {
return fmt.Errorf("Not supported in OSS Conductor")
}

schedulerClient := internal.GetSchedulerClient()
if len(args) == 0 {
return cmd.Usage()
Expand All @@ -264,17 +248,13 @@ func pauseSchedule(cmd *cobra.Command, args []string) error {
for i := 0; i < len(args); i++ {
_, _, err := schedulerClient.PauseSchedule(context.Background(), args[i])
if err != nil {
return err
return parseSchedulerAPIError(err, fmt.Sprintf("Failed to pause schedule '%s'", args[i]))
}
}
return nil
}

func resumeSchedule(cmd *cobra.Command, args []string) error {
if !isEnterpriseServer() {
return fmt.Errorf("Not supported in OSS Conductor")
}

schedulerClient := internal.GetSchedulerClient()
if len(args) == 0 {
return cmd.Usage()
Expand All @@ -283,21 +263,14 @@ func resumeSchedule(cmd *cobra.Command, args []string) error {
for i := 0; i < len(args); i++ {
_, _, err := schedulerClient.ResumeSchedule(context.Background(), args[i])
if err != nil {
return err
return parseSchedulerAPIError(err, fmt.Sprintf("Failed to resume schedule '%s'", args[i]))
}
}
return nil
}

func searchScheduledExecutions(cmd *cobra.Command, args []string) error {
if !isEnterpriseServer() {
return fmt.Errorf("Not supported in OSS Conductor")
}

schedulerClient := internal.GetSchedulerClient()
if len(args) == 0 {
return cmd.Usage()
}
count, _ := cmd.Flags().GetInt32("count")
if count > 1000 {
log.Info("count exceeds max allowed 1000. Will only show the first 1000 matching results")
Expand All @@ -323,16 +296,13 @@ func searchScheduledExecutions(cmd *cobra.Command, args []string) error {
Query: optional.NewString(query),
Sort: optional.NewString("startTime:DESC"),
}
for i := 0; i < len(args); i++ {
results, _, err := schedulerClient.SearchV2(context.Background(), &searchOpts)
if err != nil {
return err
}
items := results.Results
for _, item := range items {
execTime := time.UnixMilli(item.ExecutionTime).Format(time.UnixDate)
fmt.Println(strings.Join([]string{item.State, item.WorkflowName, execTime, item.WorkflowId, item.Reason}, ","))
}
results, _, err := schedulerClient.SearchV2(context.Background(), &searchOpts)
if err != nil {
return parseSchedulerAPIError(err, "Failed to search scheduled executions")
}
for _, item := range results.Results {
execTime := time.UnixMilli(item.ExecutionTime).Format(time.UnixDate)
fmt.Println(strings.Join([]string{item.State, item.WorkflowName, execTime, item.WorkflowId, item.Reason}, ","))
}
return nil
}
Expand All @@ -345,10 +315,6 @@ func updateSchedule(cmd *cobra.Command, args []string) error {
return createOrUpdateSchedule(true, cmd, args)
}
func createOrUpdateSchedule(update bool, cmd *cobra.Command, args []string) error {
if !isEnterpriseServer() {
return fmt.Errorf("Not supported in OSS Conductor")
}

schedulerClient := internal.GetSchedulerClient()
var request model.SaveScheduleRequest
var err error
Expand Down Expand Up @@ -426,16 +392,20 @@ func createOrUpdateSchedule(update bool, cmd *cobra.Command, args []string) erro
return fmt.Errorf("failed to parse schedule JSON: %w", err)
}
}
// Check if a schedule with this name already exists. OSS Conductor returns
// 200 with an empty body when the schedule does not exist; Orkes returns 404.
// Treat both as "does not exist".
var exists bool
//Let's check if there is an existing schedule
_, _, err = schedulerClient.GetSchedule(context.Background(), request.Name)
existing, _, err := schedulerClient.GetSchedule(context.Background(), request.Name)
if err != nil {
var swaggerErr client.GenericSwaggerError
if errors.As(err, &swaggerErr) && swaggerErr.StatusCode() == 404 {
exists = false
} else {
return parseSchedulerAPIError(err, fmt.Sprintf("Failed to look up schedule '%s'", request.Name))
}
} else {
exists = true
exists = existing.Name != ""
}

if update && !exists {
Expand All @@ -444,13 +414,10 @@ func createOrUpdateSchedule(update bool, cmd *cobra.Command, args []string) erro
if !update && exists {
return errors.New("a schedule already exists by this name " + request.Name + ". " +
"(hint: use update command to update the existing schedule) ")
}
if update {

}
_, _, err = schedulerClient.SaveSchedule(context.Background(), request)
if err != nil {
return err
return parseSchedulerAPIError(err, fmt.Sprintf("Failed to save schedule '%s'", request.Name))
}
return nil
}
Expand All @@ -477,6 +444,7 @@ func init() {
updateSchedulerCmd.Flags().Int32("version", 0, "Workflow version (0 for latest)")

searchSchedulerCmd.Flags().Int32P("count", "c", 10, "No of workflows to return (max 1000)")
searchSchedulerCmd.Flags().StringP("workflow", "w", "", "Filter by workflow name")
searchSchedulerCmd.Flags().StringP("status", "s", "", "Filter by status one of (COMPLETED, FAILED, PAUSED, RUNNING, TERMINATED, TIMED_OUT)")

schedulerCmd.AddCommand(
Expand All @@ -490,3 +458,17 @@ func init() {
searchSchedulerCmd,
)
}

// parseSchedulerAPIError wraps parseAPIError and adds a hint when the server
// returns 404 for a scheduler endpoint — that typically means the scheduler
// module is not enabled on the target server (some custom OSS builds omit it).
func parseSchedulerAPIError(err error, defaultMsg string) error {
var swaggerErr client.GenericSwaggerError
if errors.As(err, &swaggerErr) && swaggerErr.StatusCode() == 404 {
return fmt.Errorf("%s: scheduler API returned 404.\n"+
"Hint: schedules require the scheduler module on the Conductor server. "+
"The default OSS jar (conductor-oss/conductor) and Orkes Conductor both ship it; "+
"custom OSS builds may not. Verify with: curl %s/scheduler/schedules", defaultMsg, url)
}
return parseAPIError(err, defaultMsg)
}
Loading