Skip to content

Commit 7d6f423

Browse files
count users (#679)
* count users * fix: Add users-count handler and update response. Update Makefile and create onboarding worker air.toml * fix phony * refactor --------- Co-authored-by: stefanskoricdev <stephanskoric@gmail.com>
1 parent 154f09b commit 7d6f423

9 files changed

Lines changed: 194 additions & 40 deletions

File tree

Makefile

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,16 @@ watch-oss-api:
33
@export $$(cat .env | xargs) && \
44
./bin/air -c oss-api.air.toml
55

6+
.PHONY: watch-internal-api
67
watch-internal-api:
78
@export $$(cat .env | xargs) && \
89
./bin/air -c internal-api.air.toml
910

11+
.PHONY: watch-onboarding-worker
12+
watch-onboarding-worker:
13+
@export $$(cat .env | xargs) && \
14+
./bin/air -c onboarding-worker.air.toml
15+
1016
.ONESHELL:
1117
setup:
1218
@curl -sSfL https://raw.githubusercontent.com/cosmtrek/air/master/install.sh | sh -s

cmd/internal-api/main.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@ import (
1010
"time"
1111

1212
"github.com/dxta-dev/app/internal/internal_api/handler"
13+
"github.com/dxta-dev/app/internal/onboarding"
1314
"github.com/dxta-dev/app/internal/util"
1415
"github.com/go-chi/chi/v5"
1516
"github.com/go-chi/chi/v5/middleware"
1617
"github.com/go-chi/jwtauth/v5"
18+
"go.temporal.io/sdk/client"
1719

1820
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
1921
instrruntime "go.opentelemetry.io/contrib/instrumentation/runtime"
@@ -50,6 +52,11 @@ func initTracer(ctx context.Context, res *sdkresource.Resource) (*sdktrace.Trace
5052
}
5153

5254
func main() {
55+
cfg, err := onboarding.LoadConfig()
56+
if err != nil {
57+
log.Fatalln("Failed to load configuration:", err)
58+
}
59+
5360
isEndpointProvided := os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT") != "" ||
5461
os.Getenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT") != ""
5562

@@ -112,6 +119,18 @@ func main() {
112119
srv.IdleTimeout = 30 * time.Second
113120
}
114121

122+
temporalClient, err := client.Dial(client.Options{
123+
HostPort: cfg.TemporalHostPort,
124+
Namespace: cfg.TemporalOnboardingNamespace,
125+
})
126+
if err != nil {
127+
log.Fatalf("Unable to create Temporal client: %v", err)
128+
}
129+
130+
defer temporalClient.Close()
131+
132+
usersHandler := handler.NewUsers(temporalClient, *cfg)
133+
115134
r.Route("/tenant", func(r chi.Router) {
116135
if os.Getenv("ENABLE_JWT_AUTH") == "true" {
117136
pubKey, _ := util.GetRawPublicKey()
@@ -134,6 +153,12 @@ func main() {
134153
w.Write([]byte(`OK`))
135154
})
136155

156+
r.Get("/ready", func(w http.ResponseWriter, r *http.Request) {
157+
w.Write([]byte(`OK`))
158+
})
159+
160+
r.Get("/users-count", usersHandler.UsersCount)
161+
137162
go func() {
138163
log.Printf("Listening on %s\n", srv.Addr)
139164
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {

cmd/onboarding-worker/main.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,24 @@ func main() {
1919

2020
temporalClient, err := client.Dial(client.Options{
2121
HostPort: cfg.TemporalHostPort,
22-
Namespace: cfg.TemporalNamespace,
22+
Namespace: cfg.TemporalOnboardingNamespace,
2323
})
2424
if err != nil {
2525
log.Fatalln("Unable to create Temporal client", err)
2626
}
2727
defer temporalClient.Close()
2828

29-
err = onboarding.RegisterNamespace(context.Background(), cfg.TemporalHostPort, cfg.TemporalNamespace, 30)
29+
err = onboarding.RegisterNamespace(
30+
context.Background(),
31+
cfg.TemporalHostPort,
32+
cfg.TemporalOnboardingNamespace,
33+
30,
34+
)
3035
if err != nil {
3136
log.Fatalln("Failed to register Temporal namespace:", err)
3237
}
3338

34-
w := worker.New(temporalClient, cfg.TemporalQueueName, worker.Options{})
39+
w := worker.New(temporalClient, cfg.TemporalOnboardingQueueName, worker.Options{})
3540

3641
w.RegisterWorkflow(workflow.CountUsersWorkflow)
3742
w.RegisterActivity(activity.CountUsersActivity)

cmd/oss-api/main.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,14 @@ func main() {
154154
r.Get("/time-to-merge/{org}/{repo}", handler.TimeToMergeHandler)
155155
r.Get("/small-mrs/{org}/{repo}", handler.SmallMRsHandler)
156156

157+
r.Get("/health", func(w http.ResponseWriter, r *http.Request) {
158+
w.Write([]byte(`OK`))
159+
})
160+
161+
r.Get("/ready", func(w http.ResponseWriter, r *http.Request) {
162+
w.Write([]byte(`OK`))
163+
})
164+
157165
go func() {
158166
log.Printf("Listening on %s\n", srv.Addr)
159167
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package handler
2+
3+
import (
4+
"encoding/json"
5+
"errors"
6+
"fmt"
7+
"log"
8+
"net/http"
9+
10+
"github.com/dxta-dev/app/internal/onboarding"
11+
"github.com/dxta-dev/app/internal/onboarding/workflow"
12+
"github.com/dxta-dev/app/internal/util"
13+
"go.temporal.io/sdk/client"
14+
)
15+
16+
type UsersCountResponse struct {
17+
Count int `json:"count"`
18+
}
19+
20+
type Users struct {
21+
temporalClient client.Client
22+
config onboarding.Config
23+
}
24+
25+
func NewUsers(temporalClient client.Client, config onboarding.Config) *Users {
26+
return &Users{
27+
temporalClient: temporalClient,
28+
config: config,
29+
}
30+
}
31+
32+
func (u *Users) UsersCount(w http.ResponseWriter, r *http.Request) {
33+
out, err := workflow.ExecuteCountUsersWorkflow(r.Context(), u.temporalClient, u.config)
34+
if err != nil {
35+
log.Fatal(errors.Unwrap(err))
36+
}
37+
38+
w.Header().Set("Content-Type", "application/json")
39+
w.WriteHeader(http.StatusOK)
40+
if err := json.NewEncoder(w).Encode(UsersCountResponse{Count: out}); err != nil {
41+
fmt.Printf("Issue while formatting response. Error: %s", err.Error())
42+
util.JSONError(
43+
w,
44+
util.ErrorParam{Error: "Internal Server Error"},
45+
http.StatusInternalServerError,
46+
)
47+
return
48+
}
49+
}

internal/onboarding/config.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,35 +7,35 @@ import (
77
)
88

99
type Config struct {
10-
TemporalHostPort string
11-
TemporalNamespace string
12-
TemporalQueueName string
13-
UsersDSN string
10+
TemporalHostPort string
11+
TemporalOnboardingNamespace string
12+
TemporalOnboardingQueueName string
13+
UsersDSN string
1414
}
1515

1616
func LoadConfig() (*Config, error) {
17-
var hostport, namespace, taskQueue, usersDSN string
17+
var hostport, onboardingNamespace, onboardingTaskQueue, usersDSN string
1818

1919
if hostport = os.Getenv("TEMPORAL_HOSTPORT"); hostport == "" {
2020
log.Println("TEMPORAL_HOSTPORT not set; using default")
2121
}
2222

23-
if namespace = os.Getenv("TEMPORAL_NAMESPACE"); namespace == "" {
24-
return nil, errors.New("TEMPORAL_NAMESPACE is not defined")
23+
if onboardingNamespace = os.Getenv("TEMPORAL_ONBOARDING_NAMESPACE"); onboardingNamespace == "" {
24+
return nil, errors.New("TEMPORAL_ONBOARDING_NAMESPACE is not defined")
2525
}
2626

27-
if taskQueue = os.Getenv("TEMPORAL_TASK_QUEUE"); taskQueue == "" {
28-
return nil, errors.New("TEMPORAL_TASK_QUEUE is not defined")
27+
if onboardingTaskQueue = os.Getenv("TEMPORAL_ONBOARDING_TASK_QUEUE"); onboardingTaskQueue == "" {
28+
return nil, errors.New("TEMPORAL_ONBOARDING_TASK_QUEUE is not defined")
2929
}
3030

3131
if usersDSN = os.Getenv("USERS_DSN"); usersDSN == "" {
3232
return nil, errors.New("USERS_DSN is not defined")
3333
}
3434

3535
return &Config{
36-
TemporalHostPort: hostport,
37-
TemporalNamespace: namespace,
38-
TemporalQueueName: taskQueue,
39-
UsersDSN: usersDSN,
36+
TemporalHostPort: hostport,
37+
TemporalOnboardingNamespace: onboardingNamespace,
38+
TemporalOnboardingQueueName: onboardingTaskQueue,
39+
UsersDSN: usersDSN,
4040
}, nil
4141
}

internal/onboarding/namespace.go

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,24 @@
11
package onboarding
22

33
import (
4-
"context"
5-
"fmt"
6-
"time"
4+
"context"
5+
"fmt"
6+
"time"
77

8-
"google.golang.org/grpc/codes"
9-
"google.golang.org/grpc/status"
10-
"google.golang.org/protobuf/types/known/durationpb"
8+
"google.golang.org/protobuf/types/known/durationpb"
119

12-
"go.temporal.io/api/workflowservice/v1"
13-
"go.temporal.io/sdk/client"
10+
"go.temporal.io/api/workflowservice/v1"
11+
"go.temporal.io/sdk/client"
1412
)
1513

1614
func RegisterNamespace(ctx context.Context, hostPort, namespace string, retentionDays int) error {
17-
nsClient, err := client.NewNamespaceClient(client.Options{HostPort: hostPort})
18-
if err != nil {
19-
return fmt.Errorf("unable to create namespace client: %w", err)
20-
}
21-
defer nsClient.Close()
15+
nsClient, err := client.NewNamespaceClient(client.Options{HostPort: hostPort})
16+
if err != nil {
17+
return fmt.Errorf("unable to create namespace client: %w", err)
18+
}
19+
defer nsClient.Close()
2220
if _, err := nsClient.Describe(ctx, namespace); err == nil {
2321
return nil
24-
} else if s, ok := status.FromError(err); !ok || s.Code() != codes.NotFound {
25-
return fmt.Errorf("failed to describe namespace %q: %w", namespace, err)
2622
}
2723

2824
if retentionDays < 1 {
@@ -40,5 +36,5 @@ func RegisterNamespace(ctx context.Context, hostPort, namespace string, retentio
4036
return fmt.Errorf("failed to register namespace %q: %w", namespace, err)
4137
}
4238

43-
return nil
39+
return nil
4440
}

internal/onboarding/workflow/count-users.go

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ import (
1313
)
1414

1515
func CountUsersWorkflow(ctx workflow.Context, dsn string) (int, error) {
16-
ao := workflow.ActivityOptions{}
16+
ao := workflow.ActivityOptions{
17+
StartToCloseTimeout: time.Minute * 5,
18+
}
1719

1820
ctx = workflow.WithActivityOptions(ctx, ao)
1921

@@ -25,17 +27,29 @@ func CountUsersWorkflow(ctx workflow.Context, dsn string) (int, error) {
2527
return count, nil
2628
}
2729

28-
func ExecuteCountUsersWorkflow(ctx context.Context, temporalClient client.Client, cfg onboarding.Config) (int, error) {
29-
wr, err := temporalClient.ExecuteWorkflow(ctx, client.StartWorkflowOptions{
30-
ID: fmt.Sprintf("count-users-workflow-%v", time.Now().Format("20060102150405")),
31-
TaskQueue: cfg.TemporalQueueName,
32-
}, CountUsersWorkflow, cfg.UsersDSN)
30+
func ExecuteCountUsersWorkflow(
31+
ctx context.Context,
32+
temporalClient client.Client,
33+
cfg onboarding.Config,
34+
) (int, error) {
35+
wr, err := temporalClient.ExecuteWorkflow(
36+
ctx,
37+
client.StartWorkflowOptions{
38+
ID: fmt.Sprintf("count-users-workflow-%v", time.Now().Format("20060102150405")),
39+
TaskQueue: cfg.TemporalOnboardingQueueName,
40+
},
41+
CountUsersWorkflow,
42+
cfg.UsersDSN,
43+
)
3344
if err != nil {
34-
return 0, err
45+
return 0, fmt.Errorf("failed to start CountUsersWorkflow: %w", err)
3546
}
3647

3748
var result int
38-
wr.Get(ctx, &result)
49+
err = wr.Get(ctx, &result)
50+
if err != nil {
51+
return 0, fmt.Errorf("failed to get result from CountUsersWorkflow: %w", err)
52+
}
3953

4054
return result, nil
4155
}

onboarding-worker.air.toml

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
root = "."
2+
testdata_dir = "testdata"
3+
tmp_dir = "tmp"
4+
5+
[build]
6+
args_bin = []
7+
bin = "./tmp/onboarding-worker"
8+
cmd = "go build -o ./tmp/onboarding-worker ./cmd/onboarding-worker/main.go"
9+
delay = 1000
10+
exclude_dir = ["assets", "tmp", "vendor", "testdata"]
11+
exclude_file = []
12+
exclude_regex = ["_test.go"]
13+
exclude_unchanged = false
14+
follow_symlink = false
15+
full_bin = ""
16+
include_dir = []
17+
include_ext = ["go"]
18+
include_file = []
19+
kill_delay = "0s"
20+
log = "onboarding-worker.build-errors.log"
21+
poll = false
22+
poll_interval = 0
23+
post_cmd = []
24+
pre_cmd = []
25+
rerun = false
26+
rerun_delay = 500
27+
send_interrupt = false
28+
stop_on_error = false
29+
30+
[color]
31+
app = ""
32+
build = "yellow"
33+
main = "magenta"
34+
runner = "green"
35+
watcher = "cyan"
36+
37+
[log]
38+
main_only = false
39+
time = false
40+
41+
[misc]
42+
clean_on_exit = false
43+
44+
[proxy]
45+
app_port = 0
46+
enabled = false
47+
proxy_port = 0
48+
49+
[screen]
50+
clear_on_rebuild = false
51+
keep_scroll = true

0 commit comments

Comments
 (0)