Skip to content

Commit 0a28bbe

Browse files
FEAT[DXTA-320]: Create tenant db workflow (#700)
* FEAT[DXTA-320]: Add create tenant db workflow * feat: Add organization creation activity * fix: check if dev group token env is empty
1 parent 0f00925 commit 0a28bbe

8 files changed

Lines changed: 467 additions & 3 deletions

File tree

cmd/onboarding-worker/main.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@ func main() {
2323
log.Fatalln("Failed to load github configuration:", err)
2424
}
2525

26+
createTenantConfig, err := onboarding.LoadCreateTenantConfig()
27+
28+
if err != nil {
29+
log.Fatalln("Failed to load create tenant configuration:", err)
30+
}
31+
2632
temporalClient, err := client.Dial(client.Options{
2733
HostPort: cfg.TemporalHostPort,
2834
Namespace: cfg.TemporalOnboardingNamespace,
@@ -54,13 +60,17 @@ func main() {
5460
githubInstallationActivities := activity.NewGithubInstallationActivities(*githubAppClient)
5561
tenantActivities := activity.NewTenantActivities()
5662
githubActivities := activity.NewGithubActivities(*githubConfig)
63+
createTenantActivities := activity.NewCreateTenantActivities(*createTenantConfig)
5764

5865
w.RegisterWorkflow(workflow.CountUsers)
5966
w.RegisterWorkflow(workflow.AfterGithubInstallationWorkflow)
67+
w.RegisterWorkflow(workflow.CreateTenantDBWorkflow)
68+
6069
w.RegisterActivity(userActivities)
6170
w.RegisterActivity(githubInstallationActivities)
6271
w.RegisterActivity(tenantActivities)
6372
w.RegisterActivity(githubActivities)
73+
w.RegisterActivity(createTenantActivities)
6474

6575
if err := w.Run(worker.InterruptCh()); err != nil {
6676
log.Fatalln("Worker failed to start", err)

internal/internal-api/data/db.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ func NewDB(DBURL string, ctx context.Context) (DB, error) {
1717
driverName := otel.GetDriverName()
1818
devToken := os.Getenv("DXTA_DEV_GROUP_TOKEN")
1919

20+
if devToken == "" {
21+
return DB{}, errors.New("no dev group token provided")
22+
}
23+
2024
tenantDB, err := sql.Open(
2125
driverName,
2226
DBURL+"?authToken="+devToken,

internal/onboarding/activity/organization.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ func (ta *TenantActivities) GetOrganizationIDByAuthID(ctx context.Context, authI
2020
db, err := ta.GetCachedTenantDB(DBURL, ctx)
2121

2222
if err != nil {
23-
return 0, err
23+
return 0, errors.New("failed to get cached tenant DB: " + err.Error())
2424
}
2525

2626
var organizationId int64
@@ -38,3 +38,30 @@ func (ta *TenantActivities) GetOrganizationIDByAuthID(ctx context.Context, authI
3838

3939
return organizationId, nil
4040
}
41+
42+
func (ta *TenantActivities) CreateOrganization(
43+
ctx context.Context,
44+
organizationName string,
45+
authID string,
46+
DBURL string,
47+
) (bool, error) {
48+
49+
db, err := ta.GetCachedTenantDB(DBURL, ctx)
50+
51+
if err != nil {
52+
return false, errors.New("failed to get cached tenant DB: " + err.Error())
53+
}
54+
55+
_, err = db.QueryContext(ctx, `
56+
INSERT INTO organizations
57+
(name, auth_id)
58+
VALUES
59+
(?, ?);`,
60+
organizationName, authID)
61+
62+
if err != nil {
63+
return false, errors.New("failed to create organization: " + err.Error())
64+
}
65+
66+
return true, nil
67+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package activity
2+
3+
import (
4+
"context"
5+
"errors"
6+
)
7+
8+
func (ta *TenantActivities) UpsertTenantDBInfo(
9+
ctx context.Context,
10+
DBName string,
11+
DBURL string,
12+
DBDomainName string,
13+
) (bool, error) {
14+
db, err := ta.GetCachedTenantDB(DBURL, ctx)
15+
16+
if err != nil {
17+
return false, err
18+
}
19+
20+
_, err = db.QueryContext(ctx, `
21+
INSERT INTO settings
22+
(tenant_name, tenant_domain)
23+
VALUES
24+
(?, ?);`,
25+
DBName, DBDomainName,
26+
)
27+
28+
if err != nil {
29+
return false, errors.New("Failed to upsert tenant db info: " + err.Error())
30+
}
31+
32+
return true, nil
33+
}
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
package activity
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"encoding/json"
7+
"errors"
8+
"fmt"
9+
"io"
10+
"net/http"
11+
12+
"github.com/dxta-dev/app/internal/onboarding"
13+
)
14+
15+
type DatabaseData struct {
16+
DBID string `json:"DbId"`
17+
Hostname string `json:"Hostname"`
18+
Name string `json:"Name"`
19+
DBURL string
20+
}
21+
type CreateTenantDBRes struct {
22+
Database DatabaseData `json:"database"`
23+
}
24+
25+
type CreateTenantActivities struct {
26+
config onboarding.CreateTenantConfig
27+
}
28+
29+
type SeedOpts struct {
30+
Type string `json:"type"`
31+
Name string `json:"name"`
32+
}
33+
type TenantDBRequest struct {
34+
Name string `json:"name"`
35+
Group string `json:"group"`
36+
Seed SeedOpts `json:"seed"`
37+
}
38+
39+
func NewCreateTenantActivities(config onboarding.CreateTenantConfig) *CreateTenantActivities {
40+
return &CreateTenantActivities{config}
41+
}
42+
43+
func (cta CreateTenantActivities) CreateTenantDB(
44+
ctx context.Context,
45+
dbDomainName string,
46+
) (*CreateTenantDBRes, error) {
47+
reqBody := TenantDBRequest{
48+
Name: dbDomainName,
49+
Group: cta.config.TursoDBGroupName,
50+
Seed: SeedOpts{
51+
Type: "database",
52+
Name: cta.config.TenantSeedDBURL,
53+
},
54+
}
55+
56+
jsonBody, err := json.Marshal(reqBody)
57+
58+
if err != nil {
59+
return nil, errors.New("failed while marshalling request body: " + err.Error())
60+
}
61+
62+
apiUrl := fmt.Sprintf("%s/organizations/%s/databases", cta.config.TursoApiURL, cta.config.TursoOrganizationSlug)
63+
64+
req, err := http.NewRequest("POST", apiUrl, bytes.NewBuffer(jsonBody))
65+
66+
if err != nil {
67+
return nil, errors.New("failed to create HTTP request: " + err.Error())
68+
}
69+
70+
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", cta.config.TursoAuthToken))
71+
req.Header.Set("Content-Type", "application/json")
72+
73+
client := http.Client{}
74+
75+
response, err := client.Do(req)
76+
77+
if err != nil {
78+
return nil, errors.New("failed to send HTTP request: " + err.Error())
79+
}
80+
81+
defer response.Body.Close()
82+
83+
if response.StatusCode != http.StatusOK {
84+
return nil, errors.New("failed to create new tenant db with status code: " + fmt.Sprint(response.StatusCode))
85+
}
86+
87+
responseBodyBytes, err := io.ReadAll(response.Body)
88+
89+
if err != nil {
90+
return nil, errors.New("failed to read response body: " + err.Error())
91+
}
92+
93+
var body CreateTenantDBRes
94+
95+
err = json.Unmarshal(responseBodyBytes, &body)
96+
97+
if err != nil {
98+
return nil, errors.New("failed to unmarshal response body: " + err.Error())
99+
}
100+
101+
fmt.Printf("Success! Data: %v", body)
102+
103+
body.Database.DBURL = fmt.Sprintf("libsql://%s", body.Database.Hostname)
104+
105+
return &body, nil
106+
}
107+
108+
func (cta CreateTenantActivities) AddTenantDBToMap(
109+
ctx context.Context,
110+
authId string,
111+
DBName string,
112+
DBURL string,
113+
DBDomainName string,
114+
) (bool, error) {
115+
db, err := onboarding.NewDB(cta.config.OrganizationsTenantMapDBURL, ctx)
116+
117+
if err != nil {
118+
return false, errors.New("failed to get organizations-tenant-map db: " + err.Error())
119+
}
120+
121+
defer db.DB.Close()
122+
123+
_, err = db.DB.QueryContext(ctx, `
124+
INSERT INTO tenants
125+
(organization_id, db_url, name, domain)
126+
VALUES (?, ?, ?, ?);`, authId, DBURL, DBName, DBDomainName)
127+
128+
if err != nil {
129+
return false, errors.New("failed to store tenant db data to organizations-tenant-map db: " + err.Error())
130+
}
131+
132+
return true, nil
133+
}

internal/onboarding/tenant.go

Lines changed: 90 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,97 @@ import (
44
"context"
55
"database/sql"
66
"errors"
7+
"os"
78
"sync"
89

9-
internal_api_data "github.com/dxta-dev/app/internal/internal-api/data"
10+
"github.com/dxta-dev/app/internal/otel"
1011
)
1112

1213
var tenantDBConnections = sync.Map{}
1314

15+
type CreateTenantConfig struct {
16+
TursoApiURL string
17+
TursoOrganizationSlug string
18+
TursoDBGroupName string
19+
TursoAuthToken string
20+
TenantSeedDBURL string
21+
OrganizationsTenantMapDBURL string
22+
}
23+
24+
func LoadCreateTenantConfig() (*CreateTenantConfig, error) {
25+
var tursoAuthToken, tenantSeedDBURL, organizationsTenantMapDBURL, tursoApiUrl, tursoOrganizationSlug, tursoDBGroupName string
26+
27+
if tursoAuthToken = os.Getenv("TURSO_AUTH_TOKEN"); tursoAuthToken == "" {
28+
return nil, errors.New("turso auth token not defined")
29+
}
30+
31+
if tenantSeedDBURL = os.Getenv("TENANT_SEED_DB_NAME"); tenantSeedDBURL == "" {
32+
return nil, errors.New("seed db url not defined")
33+
}
34+
35+
if organizationsTenantMapDBURL = os.Getenv("ORGANIZATIONS_TENANT_MAP_DB_URL"); organizationsTenantMapDBURL == "" {
36+
return nil, errors.New("organizations tenant map db url not defined")
37+
}
38+
39+
if tursoApiUrl = os.Getenv("TURSO_API_URL"); tursoApiUrl == "" {
40+
return nil, errors.New("turso api url not defined")
41+
}
42+
43+
if tursoOrganizationSlug = os.Getenv("TURSO_ORGANIZATION_SLUG"); tursoOrganizationSlug == "" {
44+
return nil, errors.New("turso organization slug not defined")
45+
}
46+
47+
if tursoDBGroupName = os.Getenv("TURSO_DB_GROUP_NAME"); tursoDBGroupName == "" {
48+
return nil, errors.New("turso db group name not defined")
49+
}
50+
51+
return &CreateTenantConfig{
52+
TursoApiURL: tursoApiUrl,
53+
TursoOrganizationSlug: tursoOrganizationSlug,
54+
TursoDBGroupName: tursoDBGroupName,
55+
TursoAuthToken: tursoAuthToken,
56+
TenantSeedDBURL: tenantSeedDBURL,
57+
OrganizationsTenantMapDBURL: organizationsTenantMapDBURL,
58+
}, nil
59+
60+
}
61+
62+
type DB struct {
63+
DB *sql.DB
64+
}
65+
66+
func NewDB(DBURL string, ctx context.Context) (DB, error) {
67+
driverName := otel.GetDriverName()
68+
devToken := os.Getenv("DXTA_DEV_GROUP_TOKEN")
69+
70+
if devToken == "" {
71+
return DB{}, errors.New("no dev group token provided")
72+
}
73+
74+
tenantDB, err := sql.Open(
75+
driverName,
76+
DBURL+"?authToken="+devToken,
77+
)
78+
79+
if err != nil {
80+
return DB{}, errors.New("failed to open db connection " + err.Error())
81+
}
82+
83+
if err := tenantDB.PingContext(ctx); err != nil {
84+
return DB{}, errors.New("failed to verify db connection " + err.Error())
85+
}
86+
87+
return DB{
88+
DB: tenantDB,
89+
}, nil
90+
}
91+
1492
func GetCachedTenantDB(DBURL string, ctx context.Context) (*sql.DB, error) {
1593
if cachedDB, ok := tenantDBConnections.Load(DBURL); ok {
1694
return cachedDB.(*sql.DB), nil
1795
}
1896

19-
db, err := internal_api_data.NewDB(DBURL, ctx)
97+
db, err := NewDB(DBURL, ctx)
2098

2199
if err != nil {
22100
return nil, errors.New("failed to create tenant db connection: " + err.Error())
@@ -26,3 +104,13 @@ func GetCachedTenantDB(DBURL string, ctx context.Context) (*sql.DB, error) {
26104

27105
return db.DB, nil
28106
}
107+
108+
func GetDB(ctx context.Context, DBURL string) (*sql.DB, error) {
109+
db, err := NewDB(DBURL, ctx)
110+
111+
if err != nil {
112+
return nil, errors.New("failed to create db connection: " + err.Error())
113+
}
114+
115+
return db.DB, nil
116+
}

0 commit comments

Comments
 (0)