-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
179 lines (154 loc) · 4.22 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package main
import (
"context"
"errors"
"flag"
"github.com/guoyk93/esbridge/tasks"
gzip "github.com/klauspost/pgzip"
"github.com/olivere/elastic"
"github.com/tencentyun/cos-go-sdk-v5"
"log"
"net/http"
"net/url"
"os"
"strings"
_ "net/http/pprof"
)
var (
conf Conf
optConf string
optMigrate string
optRestore string
optSearch string
optNoDelete bool
optBatchSize int
optConcurrency int
optNeo bool
optBestCompression bool
optBestSpeed bool
)
func load() (err error) {
flag.BoolVar(&optNeo, "neo", false, "neo")
flag.StringVar(&optConf, "conf", "/etc/esbridge.yml", "配置文件")
flag.StringVar(&optMigrate, "migrate", "", "要迁移的离线索引, ")
flag.StringVar(&optRestore, "restore", "", "要恢复的离线索引, 格式为 INDEX/PROJECT")
flag.StringVar(&optSearch, "search", "", "要搜索的关键字")
flag.IntVar(&optBatchSize, "batch-size", 2000, "导出时的每批次大小")
flag.IntVar(&optConcurrency, "concurrency", 3, "导出时的并发数")
flag.BoolVar(&optNoDelete, "no-delete", false, "迁移时不删除索引,仅用于测试")
flag.BoolVar(&optBestCompression, "best-compression", false, "最佳压缩率")
flag.BoolVar(&optBestSpeed, "best-speed", false, "最佳压缩速度")
flag.Parse()
optConf = strings.TrimSpace(optConf)
optMigrate = strings.TrimSpace(optMigrate)
optRestore = strings.TrimSpace(optRestore)
optSearch = strings.TrimSpace(optSearch)
if conf, err = LoadConf(optConf); err != nil {
return
}
return
}
func checkIndex(index string) error {
if strings.Contains(index, "*") || strings.Contains(index, "?") {
return errors.New("不允许在索引名中包含 '*' 或者 '?'")
}
return nil
}
func exit(err *error) {
if *err != nil {
log.Printf("exited with error: %s", (*err).Error())
os.Exit(1)
} else {
log.Println("exited")
}
}
func main() {
var err error
defer exit(&err)
if err = load(); err != nil {
return
}
// pprof
go func() {
log.Print(http.ListenAndServe(conf.PProf.Bind, nil))
}()
// setup es
var clientES *elastic.Client
if clientES, err = elastic.NewClient(
elastic.SetURL(conf.Elasticsearch.URL),
elastic.SetGzip(true),
elastic.SetSniff(false),
); err != nil {
return
}
// setup cos
var clientCOS *cos.Client
u, _ := url.Parse(conf.COS.URL)
b := &cos.BaseURL{BucketURL: u}
clientCOS = cos.NewClient(b, &http.Client{Transport: &cos.AuthorizationTransport{SecretID: conf.COS.SecretID, SecretKey: conf.COS.SecretKey}})
switch {
case optMigrate != "":
index := optMigrate
if err = checkIndex(index); err != nil {
return
}
if optNeo {
if err = tasks.IndexMigrateNeo(tasks.IndexMigrateOptions{
ESClient: clientES,
COSClient: clientCOS,
NoDelete: optNoDelete,
Dir: conf.Workspace,
Index: index,
BatchSize: optBatchSize,
Concurrency: optConcurrency,
CompressionLevel: gzip.BestCompression,
}).Do(context.Background()); err != nil {
return
}
} else {
if err = tasks.IndexMigrate(tasks.IndexMigrateOptions{
ESClient: clientES,
COSClient: clientCOS,
NoDelete: optNoDelete,
Dir: conf.Workspace,
Index: index,
BatchSize: optBatchSize,
Concurrency: optConcurrency,
CompressionLevel: gzip.BestCompression,
}).Do(context.Background()); err != nil {
return
}
}
case optRestore != "":
ss := strings.Split(optRestore, "/")
if len(ss) != 2 {
err = errors.New("参数错误")
return
}
index, project := strings.TrimSpace(ss[0]), strings.TrimSpace(ss[1])
if index == "" || project == "" {
err = errors.New("参数缺失")
return
}
if err = checkIndex(index); err != nil {
return
}
if err = COSCheckFile(clientCOS, index, project); err != nil {
return
}
if err = ElasticsearchTouchIndex(clientES, index); err != nil {
return
}
if err = ElasticsearchTuneForRecoveryStart(clientES, index); err != nil {
return
}
defer ElasticsearchTuneForRecoveryEnd(clientES, index)
if err = COSImportToES(clientCOS, index, project, clientES); err != nil {
return
}
case optSearch != "":
if err = COSSearch(clientCOS, optSearch); err != nil {
return
}
}
}