Skip to content

Commit 810b9da

Browse files
committed
modify mongo driver.
1 parent 09aba83 commit 810b9da

5 files changed

Lines changed: 143 additions & 90 deletions

File tree

cache/driver/mongo/engine.go

Lines changed: 88 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,19 @@ package mongo
22

33
import "github.com/cloudtask/cloudtask-center/cache/driver/types"
44
import "github.com/cloudtask/common/models"
5+
import "github.com/cloudtask/libtools/gounits/system"
56
import "gopkg.in/mgo.v2/bson"
67
import mgo "gopkg.in/mgo.v2"
78

89
import (
10+
"strings"
911
"time"
1012
)
1113

14+
const (
15+
defaultMaxPoolSize = 20
16+
)
17+
1218
const (
1319
SYS_LOCATIONNS = "sys_locations"
1420
SYS_JOBS = "sys_jobs"
@@ -19,47 +25,100 @@ type M bson.M
1925

2026
type D bson.D
2127

28+
//MgoConfigs is exported
29+
type MgoConfigs struct {
30+
Hosts string
31+
DataBase string
32+
Auth map[string]string
33+
Options []string
34+
}
35+
2236
//Engine is exported
2337
type Engine struct {
24-
Hosts string
25-
DBName string
26-
User string
27-
Password string
28-
MaxPoolSize int
38+
MgoConfigs
2939
globalSession *mgo.Session
3040
failPulseTimes int
3141
stopCh chan struct{}
3242
}
3343

3444
//NewEngine is exported
35-
func NewEngine(hosts string, dbName string, maxPoolSize int, user string, password string) *Engine {
45+
func NewEngine(configs MgoConfigs) *Engine {
3646

3747
return &Engine{
38-
Hosts: hosts,
39-
DBName: dbName,
40-
User: user,
41-
Password: password,
42-
MaxPoolSize: maxPoolSize,
43-
stopCh: make(chan struct{}),
48+
MgoConfigs: configs,
49+
stopCh: make(chan struct{}),
4450
}
4551
}
4652

53+
func generateHostURL(configs MgoConfigs) (string, error) {
54+
55+
configs.Hosts = strings.TrimSpace(configs.Hosts)
56+
if len(configs.Hosts) == 0 {
57+
return "", ErrMongoStorageDriverHostsInvalid
58+
}
59+
60+
configs.DataBase = strings.TrimSpace(configs.DataBase)
61+
if len(configs.DataBase) == 0 {
62+
return "", ErrMongoStorageDriverDataBaseInvalid
63+
}
64+
65+
var authStr string
66+
if len(configs.Auth) > 0 {
67+
var (
68+
user, password string
69+
ret bool
70+
)
71+
if user, ret = configs.Auth["user"]; ret {
72+
authStr = user
73+
if password, ret = configs.Auth["password"]; ret {
74+
authStr = authStr + ":" + password
75+
}
76+
authStr = authStr + "@"
77+
}
78+
}
79+
80+
var optsStr string
81+
if len(configs.Options) > 0 {
82+
for index, value := range configs.Options {
83+
optsStr = optsStr + value
84+
if index != len(configs.Options)-1 {
85+
optsStr = optsStr + "&"
86+
}
87+
}
88+
}
89+
90+
mgoURL := "mongodb://" + authStr + configs.Hosts + "/" + configs.DataBase
91+
if optsStr != "" {
92+
mgoURL = mgoURL + "?" + optsStr
93+
}
94+
95+
if _, err := mgo.ParseURL(mgoURL); err != nil {
96+
return "", err
97+
}
98+
return mgoURL, nil
99+
}
100+
47101
//Open is exported
48102
func (engine *Engine) Open() error {
49103

50-
session, err := mgo.Dial(engine.Hosts)
104+
var maxPoolSize = defaultMaxPoolSize
105+
opts := system.DriverOpts(engine.Options)
106+
if value, ret := opts.Int("maxPoolSize", ""); ret {
107+
maxPoolSize = (int)(value)
108+
}
109+
110+
mgoURL, err := generateHostURL(engine.MgoConfigs)
51111
if err != nil {
52112
return err
53113
}
54114

55-
session.SetMode(mgo.Strong, true)
56-
session.SetPoolLimit(engine.MaxPoolSize)
57-
database := session.DB(engine.DBName)
58-
if engine.User != "" {
59-
if err := database.Login(engine.User, engine.Password); err != nil {
60-
return err
61-
}
115+
session, err := mgo.Dial(mgoURL)
116+
if err != nil {
117+
return err
62118
}
119+
120+
session.SetMode(mgo.Strong, true)
121+
session.SetPoolLimit(maxPoolSize)
63122
engine.globalSession = session
64123
go engine.pulseSessionLoop()
65124
return nil
@@ -79,7 +138,7 @@ func (engine *Engine) getLocation(location string) (*models.WorkLocation, error)
79138
session := engine.getSession()
80139
defer session.Close()
81140
workLocation := &models.WorkLocation{}
82-
if err := session.DB(engine.DBName).C(SYS_LOCATIONNS).
141+
if err := session.DB(engine.DataBase).C(SYS_LOCATIONNS).
83142
Find(M{"location": location}).
84143
Select(M{"_id": 0}).One(workLocation); err != nil {
85144
if err == mgo.ErrNotFound {
@@ -94,15 +153,15 @@ func (engine *Engine) postLocation(workLocation *models.WorkLocation) error {
94153

95154
session := engine.getSession()
96155
defer session.Close()
97-
return session.DB(engine.DBName).C(SYS_LOCATIONNS).
156+
return session.DB(engine.DataBase).C(SYS_LOCATIONNS).
98157
Insert(workLocation)
99158
}
100159

101160
func (engine *Engine) putLocation(workLocation *models.WorkLocation) error {
102161

103162
session := engine.getSession()
104163
defer session.Close()
105-
return session.DB(engine.DBName).C(SYS_LOCATIONNS).
164+
return session.DB(engine.DataBase).C(SYS_LOCATIONNS).
106165
Update(M{"location": workLocation.Location}, workLocation)
107166
}
108167

@@ -111,7 +170,7 @@ func (engine *Engine) readLocationsName() ([]string, error) {
111170
session := engine.getSession()
112171
defer session.Close()
113172
workLocations := []*models.WorkLocation{}
114-
if err := session.DB(engine.DBName).C(SYS_LOCATIONNS).
173+
if err := session.DB(engine.DataBase).C(SYS_LOCATIONNS).
115174
Find(M{}).
116175
Select(M{"_id": 0, "location": 1}).
117176
All(&workLocations); err != nil {
@@ -130,7 +189,7 @@ func (engine *Engine) readSimpleJobs(query M) ([]*models.SimpleJob, error) {
130189
session := engine.getSession()
131190
defer session.Close()
132191
jobs := []*models.SimpleJob{}
133-
if err := session.DB(engine.DBName).C(SYS_JOBS).
192+
if err := session.DB(engine.DataBase).C(SYS_JOBS).
134193
Find(query).
135194
Select(M{"_id": 0, "jobid": 1, "name": 1, "location": 1, "groupid": 1, "servers": 1, "enabled": 1, "stat": 1}).
136195
All(&jobs); err != nil {
@@ -144,7 +203,7 @@ func (engine *Engine) readJobs(query M) ([]*models.Job, error) {
144203
session := engine.getSession()
145204
defer session.Close()
146205
jobs := []*models.Job{}
147-
if err := session.DB(engine.DBName).C(SYS_JOBS).
206+
if err := session.DB(engine.DataBase).C(SYS_JOBS).
148207
Find(query).
149208
Select(M{"_id": 0}).
150209
All(&jobs); err != nil {
@@ -176,7 +235,7 @@ func (engine *Engine) getJob(jobid string) (*models.Job, error) {
176235
session := engine.getSession()
177236
defer session.Close()
178237
job := &models.Job{}
179-
if err := session.DB(engine.DBName).C(SYS_JOBS).
238+
if err := session.DB(engine.DataBase).C(SYS_JOBS).
180239
Find(M{"jobid": jobid}).
181240
Select(M{"_id": 0}).One(job); err != nil {
182241
if err == mgo.ErrNotFound {
@@ -191,15 +250,15 @@ func (engine *Engine) putJob(job *models.Job) error {
191250

192251
session := engine.getSession()
193252
defer session.Close()
194-
return session.DB(engine.DBName).C(SYS_JOBS).
253+
return session.DB(engine.DataBase).C(SYS_JOBS).
195254
Update(M{"jobid": job.JobId}, job)
196255
}
197256

198257
func (engine *Engine) postJobLog(jobLog *models.JobLog) error {
199258

200259
session := engine.getSession()
201260
defer session.Close()
202-
return session.DB(engine.DBName).C(SYS_LOGS).
261+
return session.DB(engine.DataBase).C(SYS_LOGS).
203262
Insert(jobLog)
204263
}
205264

cache/driver/mongo/mongo.go

Lines changed: 25 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,9 @@ import "github.com/cloudtask/libtools/gounits/logger"
77

88
import (
99
"errors"
10-
"net/url"
11-
"path"
12-
"strconv"
1310
"sync"
1411
)
1512

16-
const (
17-
defaultMaxPoolSize = 20
18-
)
19-
2013
var (
2114
ErrMongoStorageDriverHostsInvalid = errors.New("mongo storage driver hosts invalid.")
2215
ErrMongoStorageDriverDataBaseInvalid = errors.New("mongo storage driver database invalid.")
@@ -37,58 +30,48 @@ func init() {
3730
func New(parameters types.Parameters) (driver.StorageDriver, error) {
3831

3932
var (
40-
value interface{}
41-
ret bool
42-
rawHosts string
43-
dbName string
44-
user string
45-
password string
46-
maxPoolSize int
33+
ret bool
34+
value interface{}
4735
)
4836

49-
value, ret = parameters["hosts"]
50-
if !ret {
51-
return nil, ErrMongoStorageDriverHostsInvalid
37+
mgoConfigs := MgoConfigs{
38+
Auth: map[string]string{},
39+
Options: []string{},
5240
}
5341

54-
pHosts, err := url.Parse(value.(string))
55-
if err != nil {
42+
value, ret = parameters["hosts"]
43+
if !ret {
5644
return nil, ErrMongoStorageDriverHostsInvalid
5745
}
58-
59-
scheme := pHosts.Scheme
60-
if scheme == "" {
61-
scheme = "mongodb"
62-
}
63-
64-
maxPoolSize = defaultMaxPoolSize
65-
queryPoolSize := pHosts.Query().Get("maxPoolSize")
66-
if queryPoolSize != "" {
67-
if pValue, err := strconv.Atoi(queryPoolSize); err == nil {
68-
maxPoolSize = pValue
69-
}
70-
}
46+
mgoConfigs.Hosts = value.(string)
7147

7248
value, ret = parameters["database"]
7349
if !ret {
7450
return nil, ErrMongoStorageDriverDataBaseInvalid
7551
}
52+
mgoConfigs.DataBase = value.(string)
7653

77-
if dbName = value.(string); dbName == "" {
78-
return nil, ErrMongoStorageDriverDataBaseInvalid
79-
}
80-
81-
if value, ret = parameters["user"]; ret {
82-
user = value.(string)
54+
if value, ret = parameters["auth"]; ret {
55+
if auth, ok := value.(map[interface{}]interface{}); ok {
56+
if user, ok := auth["user"]; ok {
57+
mgoConfigs.Auth["user"] = user.(string)
58+
}
59+
if password, ok := auth["password"]; ok {
60+
mgoConfigs.Auth["password"] = password.(string)
61+
}
62+
}
8363
}
8464

85-
if value, ret = parameters["password"]; ret {
86-
password = value.(string)
65+
if value, ret = parameters["options"]; ret {
66+
if options, ok := value.([]interface{}); ok {
67+
for _, option := range options {
68+
mgoConfigs.Options = append(mgoConfigs.Options, option.(string))
69+
}
70+
}
8771
}
8872

89-
rawHosts = scheme + "://" + pHosts.Host + path.Clean(pHosts.Path) + "?" + pHosts.RawQuery
9073
return &MongoStorageDriver{
91-
engine: NewEngine(rawHosts, dbName, maxPoolSize, user, password),
74+
engine: NewEngine(mgoConfigs),
9275
}, nil
9376
}
9477

etc/jobserver.yaml

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,42 @@
1-
version: v.2.5.0
1+
version: v.2.0.0
22
pidfile: ./jobserver.pid
33
retrystartup: true
44
cluster:
55
hosts: 192.168.2.80:2181,192.168.2.81:2181,192.168.2.82:2181
6-
root: /cloudtask_dev
6+
root: /cloudtask
77
pulse: 30s
88
timeout: 60s
99
threshold: 1
1010
api:
1111
hosts: [":8985"]
1212
enablecors: true
1313
scheduler:
14-
allocmode: hash #pref
14+
allocmode: hash
1515
allocrecovery: 320s
1616
cache:
1717
lrusize: 1024
1818
storage:
1919
mongo:
20-
hosts: mongodb://192.168.2.80:27017,192.168.2.81:27017,192.168.2.82:27017/cloudtask_dev?replicaSet=mgoCluster&maxPoolSize=20
21-
database: cloudtask_dev
20+
hosts: 192.168.2.80:27017,192.168.2.81:27017,192.168.2.82:27017
21+
database: cloudtask
22+
#auth: {
23+
# user: datastoreAdmin,
24+
# password: ds4dev
25+
#}
26+
options: [
27+
"maxPoolSize=20"
28+
"replicaSet=mgoCluster",
29+
#"authSource=admin",
30+
]
2231
notifications:
2332
endpoints:
2433
- name: smtp
2534
host: smtp.example.com
26-
port: 456
27-
user: abc
28-
password: 123456
29-
sender: abc@.example.com
30-
enabled: false
35+
port: 25
36+
user:
37+
password:
38+
sender: cloudtask@example.com
39+
enabled: true
3140
logger:
3241
logfile: ./logs/jobserver.log
3342
loglevel: info

0 commit comments

Comments
 (0)