Skip to content

Commit 6e21be1

Browse files
Feat/reindex (#61)
* feat: reindex src -> dest route with mappings * feat: handle mappings and data cloning with action * fix: checkVar method * format: grammar and move relevant code inside the condition
1 parent c17ff73 commit 6e21be1

7 files changed

Lines changed: 125 additions & 66 deletions

File tree

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,11 @@ require (
1313
github.com/robfig/cron v1.1.0
1414
github.com/rogpeppe/go-internal v1.2.2 // indirect
1515
github.com/rs/cors v1.6.0
16-
github.com/siddharthlatest/mustache v0.0.0-20160118163553-00029677272d
1716
github.com/smartystreets/goconvey v1.6.4
18-
github.com/stretchr/testify v1.3.0
1917
github.com/ulule/limiter v2.2.0+incompatible
2018
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2
2119
gopkg.in/natefinch/lumberjack.v2 v2.0.0
2220
gopkg.in/olivere/elastic.v6 v6.2.26
2321
)
22+
23+
go 1.13

go.sum

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys=
3333
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
3434
github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
3535
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
36-
github.com/fortytw2/leaktest v1.2.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
3736
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
3837
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
3938
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
@@ -227,6 +226,7 @@ github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY=
227226
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
228227
github.com/google/uuid v1.0.0 h1:b4Gk+7WdP/d3HZH8EJsZpvV7EtDOgaZLtnaNGIu1adA=
229228
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
229+
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
230230
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
231231
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
232232
github.com/gorilla/mux v1.6.2 h1:Pgr17XVTNXAk3q/r4CpKzC5xBM/qW1uVLV+IhRZpIIk=
@@ -266,13 +266,14 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o
266266
github.com/konsorten/go-windows-terminal-sequences v0.0.0-20180402223658-b729f2633dfe/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
267267
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
268268
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
269+
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
269270
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
270271
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
271272
github.com/kr/pty v1.1.3/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
273+
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
272274
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
273275
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
274276
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
275-
github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
276277
github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe h1:W/GaMY0y69G4cFlmsC6B9sbuo2fP8OFP1ABjt4kPz+w=
277278
github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
278279
github.com/markbates/deplist v1.0.4/go.mod h1:gRRbPbbuA8TmMiRvaOzUlRfzfjeCCBqX2A6arxN01MM=
@@ -354,7 +355,6 @@ github.com/shurcooL/highlight_diff v0.0.0-20170515013008-09bb4053de1b/go.mod h1:
354355
github.com/shurcooL/highlight_go v0.0.0-20170515013102-78fb10f4a5f8/go.mod h1:UDKB5a1T23gOMUJrI+uSuH0VRDStOiUVSjBTRDVBVag=
355356
github.com/shurcooL/octicon v0.0.0-20180602230221-c42b0e3b24d9/go.mod h1:eWdoE5JD4R5UVWDucdOPg1g2fqQRq78IQa9zlOV1vpQ=
356357
github.com/shurcooL/sanitized_anchor_name v0.0.0-20170918181015-86672fcb3f95/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
357-
github.com/siddharthlatest/mustache v0.0.0-20160118163553-00029677272d/go.mod h1:ej02m5sGsrB5WBnM5xLGTfxUInBAbfNFPAsoJxBjn50=
358358
github.com/sirupsen/logrus v1.0.6/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=
359359
github.com/sirupsen/logrus v1.1.0/go.mod h1:zrgwTnHtNr00buQ1vSptGe8m1f/BbgsPukg8qsT7A+A=
360360
github.com/sirupsen/logrus v1.1.1/go.mod h1:zrgwTnHtNr00buQ1vSptGe8m1f/BbgsPukg8qsT7A+A=
@@ -459,6 +459,7 @@ golang.org/x/sys v0.0.0-20190102155601-82a175fd1598/go.mod h1:STP8DvDyc/dI5b8T5h
459459
golang.org/x/sys v0.0.0-20190116161447-11f53e031339/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
460460
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
461461
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
462+
golang.org/x/sys v0.0.0-20191008105621-543471e840be h1:QAcqgptGM8IQBC9K/RC4o+O9YmqEm0diQn9QmZw/0mU=
462463
golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
463464
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
464465
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
@@ -501,6 +502,7 @@ gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4
501502
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
502503
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc/go.mod h1:m7x9LTH6d71AHyAX77c9yqWCCa3UKHcVEj9y7hAtKDk=
503504
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
505+
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
504506
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
505507
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
506508
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=

plugins/elasticsearch/routes.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@ import (
1111
"sync"
1212

1313
"github.com/appbaseio/arc/middleware"
14-
"github.com/appbaseio/arc/plugins"
1514
"github.com/appbaseio/arc/model/acl"
1615
"github.com/appbaseio/arc/model/category"
1716
"github.com/appbaseio/arc/model/op"
17+
"github.com/appbaseio/arc/plugins"
1818
"github.com/appbaseio/arc/util"
1919
"github.com/gobuffalo/packr"
2020
)
@@ -49,7 +49,7 @@ type spec struct {
4949
} `json:"body,omitempty"`
5050
}
5151

52-
func (es *elasticsearch) preprocess(mw [] middleware.Middleware) error {
52+
func (es *elasticsearch) preprocess(mw []middleware.Middleware) error {
5353
files := make(chan string)
5454
apis := make(chan api)
5555

plugins/registry.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,14 @@ type Plugin interface {
3737
InitFunc() error
3838

3939
// The plugin's elastic search middleware, if any.
40-
ESMiddleware() [] middleware.Middleware
40+
ESMiddleware() []middleware.Middleware
4141
}
4242

4343
// ElasticSearchPlugin holds the plugin for ES
4444
type ESPlugin interface {
4545
nameRoutes
46-
47-
// mw takes a array of middleware to be intialized by ES Plugin
46+
47+
// mw takes a array of middleware to be intialized by ES Plugin
4848
InitFunc(mw []middleware.Middleware) error
4949
}
5050

plugins/reindexer/dao.go

Lines changed: 37 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424
//
2525
// We accept a query param `wait_for_completion` which defaults to true, which when false, we don't create any aliases
2626
// and delete the old index, we instead return the tasks API response.
27-
func reindex(ctx context.Context, indexName string, config *reindexConfig, waitForCompletion bool) ([]byte, error) {
27+
func reindex(ctx context.Context, sourceIndex string, config *reindexConfig, waitForCompletion bool, destinationIndex string) ([]byte, error) {
2828
var err error
2929

3030
// We fetch the index name pointing to the given alias first.
@@ -34,41 +34,44 @@ func reindex(ctx context.Context, indexName string, config *reindexConfig, waitF
3434
// from the given alias. If alias name doesn't exist we get an
3535
// empty slice of indices, which means the index has never been
3636
// reindexed before.
37-
indices, err := getIndicesByAlias(ctx, indexName)
37+
indices, err := getIndicesByAlias(ctx, sourceIndex)
3838
if err != nil {
3939
log.Println(err)
4040
}
4141
if len(indices) > 1 {
42-
return nil, fmt.Errorf(`multiple indices pointing to alias "%s"`, indexName)
42+
return nil, fmt.Errorf(`multiple indices pointing to alias "%s"`, sourceIndex)
4343
}
4444
if len(indices) == 1 {
45-
indexName = indices[0]
45+
sourceIndex = indices[0]
4646
}
4747

4848
// If mappings are not passed, we fetch the mappings of the old index.
4949
if config.Mappings == nil {
50-
config.Mappings, err = mappingsOf(ctx, indexName)
50+
config.Mappings, err = mappingsOf(ctx, sourceIndex)
5151
if err != nil {
52-
return nil, fmt.Errorf(`error fetching mappings of index "%s": %v`, indexName, err)
52+
return nil, fmt.Errorf(`error fetching mappings of index "%s": %v`, sourceIndex, err)
5353
}
5454
}
5555

5656
// If settings are not passed, we fetch the settings of the old index.
5757
if config.Settings == nil {
58-
config.Settings, err = settingsOf(ctx, indexName)
58+
config.Settings, err = settingsOf(ctx, sourceIndex)
5959
if err != nil {
60-
return nil, fmt.Errorf(`error fetching settings of index "%s": %v`, indexName, err)
60+
return nil, fmt.Errorf(`error fetching settings of index "%s": %v`, sourceIndex, err)
6161
}
6262
}
6363

6464
// Setup the destination index prior to running the _reindex action.
6565
body := make(map[string]interface{})
6666
body["mappings"] = config.Mappings
6767
body["settings"] = config.Settings
68+
newIndexName := destinationIndex
69+
if destinationIndex == "" {
70+
newIndexName, err = reindexedName(sourceIndex)
71+
}
6872

69-
newIndexName, err := reindexedName(indexName)
7073
if err != nil {
71-
return nil, fmt.Errorf(`error generating a new index name for index "%s": %v`, indexName, err)
74+
return nil, fmt.Errorf(`error generating a new index name for index "%s": %v`, sourceIndex, err)
7275
}
7376

7477
// Create the new index.
@@ -77,9 +80,14 @@ func reindex(ctx context.Context, indexName string, config *reindexConfig, waitF
7780
return nil, err
7881
}
7982

83+
// abruptly return if action is mappings
84+
if config.Action == "mappings" {
85+
return nil, nil
86+
}
87+
8088
// Configure reindex source
8189
src := es7.NewReindexSource().
82-
Index(indexName).
90+
Index(sourceIndex).
8391
Type(config.Types...).
8492
FetchSourceIncludeExclude(config.Include, config.Exclude)
8593

@@ -103,23 +111,24 @@ func reindex(ctx context.Context, indexName string, config *reindexConfig, waitF
103111
return nil, err
104112
}
105113

106-
// Fetch all the aliases of old index
107-
aliases, err := aliasesOf(ctx, indexName)
108-
if err != nil {
109-
return nil, fmt.Errorf(`error fetching aliases of index "%s": %v`, indexName, err)
110-
}
111-
aliases = append(aliases, indexName)
112-
113-
// Delete old index
114-
err = deleteIndex(ctx, indexName)
115-
if err != nil {
116-
return nil, fmt.Errorf(`error deleting index "%s": %v\n`, indexName, err)
117-
}
118-
119-
// Set aliases of old index to the new index.
120-
err = setAlias(ctx, newIndexName, aliases...)
121-
if err != nil {
122-
return nil, fmt.Errorf(`error setting alias "%s" for index "%s"`, indexName, newIndexName)
114+
if destinationIndex == "" {
115+
// Fetch all the aliases of old index
116+
aliases, err := aliasesOf(ctx, sourceIndex)
117+
if err != nil {
118+
return nil, fmt.Errorf(`error fetching aliases of index "%s": %v`, sourceIndex, err)
119+
}
120+
aliases = append(aliases, sourceIndex)
121+
122+
// Delete old index
123+
err = deleteIndex(ctx, sourceIndex)
124+
if err != nil {
125+
return nil, fmt.Errorf(`error deleting index "%s": %v\n`, sourceIndex, err)
126+
}
127+
// Set aliases of old index to the new index.
128+
err = setAlias(ctx, newIndexName, aliases...)
129+
if err != nil {
130+
return nil, fmt.Errorf(`error setting alias "%s" for index "%s"`, sourceIndex, newIndexName)
131+
}
123132
}
124133

125134
return json.Marshal(response)

plugins/reindexer/handlers.go

Lines changed: 69 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,52 +17,93 @@ type reindexConfig struct {
1717
Include []string `json:"include_fields"`
1818
Exclude []string `json:"exclude_fields"`
1919
Types []string `json:"types"`
20+
Action string `json:"action"`
2021
}
2122

2223
func (rx *reindexer) reindex() http.HandlerFunc {
2324
return func(w http.ResponseWriter, req *http.Request) {
2425
vars := mux.Vars(req)
2526
indexName, ok := vars["index"]
26-
if !ok {
27-
util.WriteBackError(w, "Route inconsistency, expecting var {index}", http.StatusInternalServerError)
27+
if checkVar(ok, w, "index") {
2828
return
2929
}
3030

31-
reqBody, err := ioutil.ReadAll(req.Body)
32-
if err != nil {
33-
log.Printf("%s: %v\n", logTag, err)
34-
util.WriteBackError(w, "Can't read request body", http.StatusBadRequest)
31+
err, body, waitForCompletion, done := reindexConfigResponse(req, w)
32+
if done {
3533
return
3634
}
37-
defer req.Body.Close()
3835

39-
var body reindexConfig
40-
err = json.Unmarshal(reqBody, &body)
41-
if err != nil {
42-
log.Printf("%s: %v\n", logTag, err)
43-
util.WriteBackError(w, "Can't parse request body", http.StatusBadRequest)
44-
return
45-
}
36+
response, err := reindex(req.Context(), indexName, &body, waitForCompletion, "")
37+
errorHandler(err, w, response)
38+
}
39+
}
4640

47-
// By default, wait_for_completion = true
48-
param := req.URL.Query().Get("wait_for_completion")
49-
if param == "" {
50-
param = "true"
41+
func (rx *reindexer) reindexSrcToDest() http.HandlerFunc {
42+
return func(w http.ResponseWriter, req *http.Request) {
43+
vars := mux.Vars(req)
44+
sourceIndex, okS := vars["source_index"]
45+
destinationIndex, okD := vars["destination_index"]
46+
if checkVar(okS, w, "source_index") {
47+
return
5148
}
52-
waitForCompletion, err := strconv.ParseBool(param)
53-
if err != nil {
54-
log.Printf("%s: %v", logTag, err)
55-
util.WriteBackError(w, err.Error(), http.StatusBadRequest)
49+
if checkVar(okD, w, "destination_index") {
5650
return
5751
}
58-
59-
response, err := reindex(req.Context(), indexName, &body, waitForCompletion)
60-
if err != nil {
61-
log.Printf("%s: %v\n", logTag, err)
62-
util.WriteBackError(w, err.Error(), http.StatusNotFound)
52+
err, body, waitForCompletion, done := reindexConfigResponse(req, w)
53+
if done {
6354
return
6455
}
6556

66-
util.WriteBackRaw(w, response, http.StatusOK)
57+
response, err := reindex(req.Context(), sourceIndex, &body, waitForCompletion, destinationIndex)
58+
errorHandler(err, w, response)
59+
}
60+
}
61+
62+
func errorHandler(err error, w http.ResponseWriter, response []byte) {
63+
if err != nil {
64+
log.Printf("%s: %v\n", logTag, err)
65+
util.WriteBackError(w, err.Error(), http.StatusNotFound)
66+
return
67+
}
68+
69+
util.WriteBackRaw(w, response, http.StatusOK)
70+
}
71+
72+
func checkVar(okS bool, w http.ResponseWriter, variable string) bool {
73+
if !okS {
74+
util.WriteBackError(w, "Route inconsistency, expecting var "+variable, http.StatusInternalServerError)
75+
return true
76+
}
77+
return false
78+
}
79+
80+
func reindexConfigResponse(req *http.Request, w http.ResponseWriter) (error, reindexConfig, bool, bool) {
81+
reqBody, err := ioutil.ReadAll(req.Body)
82+
if err != nil {
83+
log.Printf("%s: %v\n", logTag, err)
84+
util.WriteBackError(w, "Can't read request body", http.StatusBadRequest)
85+
return nil, reindexConfig{}, false, true
86+
}
87+
defer req.Body.Close()
88+
89+
var body reindexConfig
90+
err = json.Unmarshal(reqBody, &body)
91+
if err != nil {
92+
log.Printf("%s: %v\n", logTag, err)
93+
util.WriteBackError(w, "Can't parse request body", http.StatusBadRequest)
94+
return nil, reindexConfig{}, false, true
95+
}
96+
97+
// By default, wait_for_completion = true
98+
param := req.URL.Query().Get("wait_for_completion")
99+
if param == "" {
100+
param = "true"
101+
}
102+
waitForCompletion, err := strconv.ParseBool(param)
103+
if err != nil {
104+
log.Printf("%s: %v", logTag, err)
105+
util.WriteBackError(w, err.Error(), http.StatusBadRequest)
106+
return nil, reindexConfig{}, false, true
67107
}
108+
return err, body, waitForCompletion, false
68109
}

plugins/reindexer/routes.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,13 @@ import (
99
func (rx *reindexer) routes() []plugins.Route {
1010
middleware := (&chain{}).Wrap
1111
routes := []plugins.Route{
12+
{
13+
Name: "Reindex source to destination",
14+
Methods: []string{http.MethodPost},
15+
Path: "/_reindex/{source_index}/{destination_index}",
16+
HandlerFunc: middleware(rx.reindexSrcToDest()),
17+
Description: "Reindexes an index to a provided destination index with optionally provided mappings, settings and data.",
18+
},
1219
{
1320
Name: "Reindex",
1421
Methods: []string{http.MethodPost},

0 commit comments

Comments
 (0)