本文代碼來自于官方示例。
一窗慎、配置
此例演示的是配置客戶端的傳輸Transport
。
類例中的配置主要用于說明其作用卤材,不適用于生產(chǎn)環(huán)境遮斥。
默認(rèn)的傳輸就夠用了。
package main
import (
"github.com/elastic/go-elasticsearch/v8"
)
func main() {
cfg := elasticsearch.Config{
Addresses: []string{"http://localhost:9200"},
Transport: &http.Transport{
MaxIdleConnsPerHost: 10,
ResponseHeaderTimeout: time.Millisecond,
DialContext: (&net.Dialer{Timeout: time.Nanosecond}).DialContext,
TLSClientConfig: &tls.Config{
MinVersion: tls.VersionTLS11,
},
},
}
es, err := elasticsearch.NewClient(cfg)
if err != nil {
panic(err)
}
resp, err := es.Info()
if err != nil {
panic(err)
}
fmt.Println(resp)
// => panic: dial tcp: i/o timeout
}
二扇丛、自定義傳輸
自定義傳輸用于讀取或操作請求和響應(yīng)术吗,自定義日志記錄,將自定義 header 傳遞給請求等帆精。
CountingTransport
將自定義 header 添加到請求较屿,記錄有關(guān)請求和響應(yīng)的信息,并統(tǒng)計請求次數(shù)卓练。
由于它實現(xiàn)了http.RoundTripper
接口隘蝎,因此可以將其作為自定義 HTTP 傳輸實現(xiàn)傳遞給客戶端。
type CountingTransport struct {
count uint64
}
// RoundTrip 發(fā)送一個請求襟企,返回一個響應(yīng)
func (t *CountingTransport) RoundTrip(req *http.Request) (*http.Response, error) {
var buf bytes.Buffer
atomic.AddUint64(&t.count, 1)
req.Header.Set("Accept", "application/yaml")
req.Header.Set("X-Request-ID", "foo-123")
res, err := http.DefaultTransport.RoundTrip(req)
buf.WriteString(strings.Repeat("-", 80) + "\n")
fmt.Fprintf(&buf, "%s %s", req.Method, req.URL.String())
if err != nil {
fmt.Fprintf(&buf, "ERROR: %s\n", err)
} else {
fmt.Fprintf(&buf, "[%s] %s\n", res.Status, res.Header.Get("Content-Type"))
}
buf.WriteTo(os.Stdout)
return res, err
}
func main() {
var wg sync.WaitGroup
// 創(chuàng)建一個自定義傳輸
tp := new(CountingTransport)
// 將自定義傳輸放到客戶端配置中
es, _ := elasticsearch.NewClient(elasticsearch.Config{Transport: tp})
for i := 0; i < 25; i++ {
wg.Add(1)
go func() {
defer wg.Done()
es.Info()
}()
}
wg.Wait()
fmt.Println(strings.Repeat("=", 80))
fmt.Printf("%80s\n", fmt.Sprintf("Total Requests: %d", atomic.LoadUint64(&tp.count)))
}
三嘱么、日志
1 使用默認(rèn)日志
默認(rèn)日志有以下四種:
- estransport.TextLogger
- estransport.ColorLogger
- estransport.CurlLogger
- estransport.JSONLogger
func main() {
log.SetFlags(0)
var es *elasticsearch.Client
es, _ = elasticsearch.NewClient(elasticsearch.Config{
Logger: &estransport.TextLogger{Output: os.Stdout},
})
run(es, "Text")
es, _ = elasticsearch.NewClient(elasticsearch.Config{
Logger: &estransport.ColorLogger{Output: os.Stdout},
})
run(es, "Color")
es, _ = elasticsearch.NewClient(elasticsearch.Config{
Logger: &estransport.ColorLogger{
Output: os.Stdout,
EnableRequestBody: true,
EnableResponseBody: true,
},
})
run(es, "Request/Response body")
es, _ = elasticsearch.NewClient(elasticsearch.Config{
Logger: &estransport.CurlLogger{
Output: os.Stdout,
EnableRequestBody: true,
EnableResponseBody: true,
},
})
run(es, "Curl")
es, _ = elasticsearch.NewClient(elasticsearch.Config{
Logger: &estransport.JSONLogger{
Output: os.Stdout,
},
})
run(es, "JSON")
}
func run(es *elasticsearch.Client, name string) {
log.Println("███", fmt.Sprintf("\x1b[1m%s\x1b[0m", name), strings.Repeat("█", 75-len(name)))
es.Delete("test", "1")
es.Exists("test", "1")
es.Index(
"test",
strings.NewReader(`{"title": "logging"}`),
es.Index.WithRefresh("true"),
es.Index.WithPretty(),
es.Index.WithFilterPath("result", "_id"),
)
es.Search(es.Search.WithQuery("{FAIL"))
res, err := es.Search(
es.Search.WithIndex("test"),
es.Search.WithBody(strings.NewReader(`{"query": {"match": {"title": "logging"}}}`)),
es.Search.WithSize(1),
es.Search.WithPretty(),
es.Search.WithFilterPath("took", "hits.hits"),
)
s := res.String()
if len(s) <= len("[200 OK] ") {
log.Fatal("Response body is empty")
}
if err != nil {
log.Fatalf("Error: %s", err)
}
log.Println()
}
四種日志的用途:
-
TextLogger
:將請求和響應(yīng)的基本信息以明文的形式輸出 -
ColorLogger
:在開發(fā)時能在終端將一些信息以不同顏色輸出 -
CurlLogger
:將信息格式化為可運行的curl
命令,當(dāng)啟用EnableResponseBody
時會美化輸出 -
JSONLogger
:將信息以 json 格式輸出顽悼,適用于生產(chǎn)環(huán)境的日志
如果要記錄請求和響應(yīng)的 body 內(nèi)容曼振,需要開啟對應(yīng)的選項:
-
EnableRequestBody
:記錄請求體 -
EnableResponseBody
:記錄響應(yīng)體
2 自定義日志
根據(jù)estransport.Logger
接口几迄,實現(xiàn)自定義日志。
日志包使用rs/zerolog
冰评。
package main
import (
"github.com/rs/zerolog"
)
// CustomLogger 實現(xiàn) estransport.Logger 接口
type CustomLogger struct {
zerolog.Logger
}
// LogRoundTrip 打印請求和響應(yīng)的一些信息
func (l *CustomLogger) LogRoundTrip(
req *http.Request,
res *http.Response,
err error,
start time.Time,
dur time.Duration,
) error {
var (
e *zerolog.Event
nReq int64
nRes int64
)
// 設(shè)置日志等級
switch {
case err != nil:
e = l.Error()
case res != nil && res.StatusCode > 0 && res.StatusCode < 300:
e = l.Info()
case res != nil && res.StatusCode > 299 && res.StatusCode < 500:
e = l.Warn()
case res != nil && res.StatusCode > 499:
e = l.Error()
default:
e = l.Error()
}
// 計算請求體和響應(yīng)體的字節(jié)數(shù)
if req != nil && req.Body != nil && req.Body != http.NoBody {
nReq, _ = io.Copy(ioutil.Discard, req.Body)
}
if res != nil && res.Body != nil && res.Body != http.NoBody {
nRes, _ = io.Copy(ioutil.Discard, res.Body)
}
// 日志事件
e.Str("method", req.Method).
Int("status_code", res.StatusCode).
Dur("duration", dur).
Int64("req_bytes", nReq).
Int64("res_bytes", nRes).
Msg(req.URL.String())
return nil
}
// RequestBodyEnabled 輸出請求體
func (l *CustomLogger) RequestBodyEnabled() bool { return true }
// ResponseBodyEnabled 輸出響應(yīng)體
func (l *CustomLogger) ResponseBodyEnabled() bool { return true }
func main() {
// 設(shè)置日志
log := zerolog.New(zerolog.ConsoleWriter{Out: os.Stderr}).
Level(zerolog.InfoLevel).
With().
Timestamp().
Logger()
// 客戶端使用自定義的日志
es, _ := elasticsearch.NewClient(elasticsearch.Config{
Logger: &CustomLogger{log},
})
{
es.Delete("test", "1")
es.Exists("test", "1")
es.Index("test", strings.NewReader(`{"title": "logging"}`), es.Index.WithRefresh("true"))
es.Search(
es.Search.WithQuery("{FAIL"),
)
es.Search(
es.Search.WithIndex("test"),
es.Search.WithBody(strings.NewReader(`{"query": {"match": {"title": "logging"}}}`)),
es.Search.WithSize(1),
)
}
}
結(jié)果如圖:
四映胁、批量索引
1 默認(rèn)
此示例有意不使用任何抽象或輔助功能來展示使用 bulk api 的低級機(jī)制:準(zhǔn)備元數(shù)據(jù)有效載荷,批量發(fā)送有效載荷甲雅,檢查錯誤結(jié)果并打印報告解孙。
請看代碼注釋:
package main
import (
"github.com/dustin/go-humanize"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
)
type Article struct {
ID int `json:"id"`
Title string `json:"title"`
Body string `json:"body"`
Published time.Time `json:"published"`
Author Author `json:"author"`
}
type Author struct {
FirstName string `json:"first_name"`
LastName string `json:"last_name"`
}
var (
_ = fmt.Print
count int
batch int
)
func init() {
flag.IntVar(&count, "count", 1000, "生成的文檔數(shù)量")
flag.IntVar(&batch, "batch", 255, "每次發(fā)送的文檔數(shù)量")
flag.Parse()
rand.Seed(time.Now().UnixNano())
}
func main() {
log.SetFlags(0)
type bulkResponse struct {
Errors bool `json:"errors"`
Items []struct {
Index struct {
ID string `json:"_id"`
Result string `json:"result"`
Status int `json:"status"`
Error struct {
Type string `json:"type"`
Reason string `json:"reason"`
Cause struct {
Type string `json:"type"`
Reason string `json:"reason"`
} `json:"caused_by"`
} `json:"error"`
} `json:"index"`
} `json:"items"`
}
var (
buf bytes.Buffer
res *esapi.Response
err error
raw map[string]interface{}
blk *bulkResponse
articles []*Article
indexName = "articles"
numItems int
numErrors int
numIndexed int
numBatches int
currBatch int
)
log.Printf(
"\x1b[1mBulk\x1b[0m: documents [%s] batch size [%s]",
humanize.Comma(int64(count)), humanize.Comma(int64(batch)))
log.Println(strings.Repeat("_", 65))
// 創(chuàng)建客戶端
es, err := elasticsearch.NewDefaultClient()
if err != nil {
panic(err)
}
// 生成文章
names := []string{"Alice", "John", "Mary"}
for i := 1; i < count+1; i++ {
articles = append(articles, &Article{
ID: i,
Title: strings.Join([]string{"Title", strconv.Itoa(i)}, " "),
Body: "Lorem ipsum dolor sit amet...",
Published: time.Now().Round(time.Second).Local().AddDate(0, 0, i),
Author: Author{
FirstName: names[rand.Intn(len(names))],
LastName: "Smith",
},
})
}
log.Printf("→ Generated %s articles", humanize.Comma(int64(len(articles))))
fmt.Println("→ Sending batch ")
// 重新創(chuàng)建索引
if res, err = es.Indices.Delete([]string{indexName}); err != nil {
log.Fatalf("Cannot delete index: %s", err)
}
res, err = es.Indices.Create(indexName)
if err != nil {
log.Fatalf("Cannot create index: %s", err)
}
if res.IsError() {
log.Fatalf("Cannot create index: %s", res)
}
if count%batch == 0 {
numBatches = count / batch
} else {
numBatches = count/batch + 1
}
start := time.Now().Local()
// 循環(huán)收集
for i, a := range articles {
numItems++
currBatch = i / batch
if i == count-1 {
currBatch++
}
// 準(zhǔn)備元數(shù)據(jù)有效載荷
meta := []byte(fmt.Sprintf(`{ "index" : { "_id" : "%d" } }%s`, a.ID, "\n"))
// 準(zhǔn)備 data 有效載荷:序列化后的 article
data, err := json.Marshal(a)
if err != nil {
log.Fatalf("Cannot encode article %d: %s", a.ID, err)
}
// 在 data 載荷中添加一個換行符
data = append(data, "\n"...)
// 將載荷添加到 buf 中
buf.Grow(len(meta) + len(data))
buf.Write(meta)
buf.Write(data)
// 達(dá)到閾值時,使用 buf 中的數(shù)據(jù)(請求體)執(zhí)行 Bulk() 請求
if i > 0 && i%batch == 0 || i == count-1 {
fmt.Printf("[%d/%d] ", currBatch, numBatches)
// 每 batch(本例中是255)個為一組發(fā)送
res, err = es.Bulk(bytes.NewReader(buf.Bytes()), es.Bulk.WithIndex(indexName))
if err != nil {
log.Fatalf("Failur indexing batch %d: %s", currBatch, err)
}
// 如果整個請求失敗抛人,打印錯誤并標(biāo)記所有文檔都失敗
if res.IsError() {
numErrors += numItems
if err := json.NewDecoder(res.Body).Decode(&raw); err != nil {
log.Fatalf("Failure to parse response body: %s", err)
} else {
log.Printf(" Error: [%d] %s: %s",
res.StatusCode,
raw["error"].(map[string]interface{})["type"],
raw["error"].(map[string]interface{})["reason"],
)
}
} else { // 一個成功的響應(yīng)也可能因為一些特殊文檔包含一些錯誤
if err := json.NewDecoder(res.Body).Decode(&blk); err != nil {
log.Fatalf("Failure to parse response body: %s", err)
} else {
for _, d := range blk.Items {
// 對任何狀態(tài)碼大于 201 的請求進(jìn)行處理
if d.Index.Status > 201 {
numErrors++
log.Printf(" Error: [%d]: %s: %s: %s: %s",
d.Index.Status,
d.Index.Error.Type,
d.Index.Error.Reason,
d.Index.Error.Cause.Type,
d.Index.Error.Cause.Reason,
)
} else {
// 如果狀態(tài)碼小于等于 201妆距,對成功的計數(shù)器 numIndexed 加 1
numIndexed++
}
}
}
}
// 關(guān)閉響應(yīng)體,防止達(dá)到 Goroutines 或文件句柄限制
res.Body.Close()
// 重置 buf 和 items 計數(shù)器
buf.Reset()
numItems = 0
}
}
// 報告結(jié)果:索引成功的文檔的數(shù)量函匕,錯誤的數(shù)量娱据,耗時,索引速度
fmt.Println()
log.Println(strings.Repeat("▔", 65))
dur := time.Since(start)
if numErrors > 0 {
log.Fatalf(
"Indexed [%s] documents with [%s] errors in %s (%s docs/sec)",
humanize.Comma(int64(numIndexed)),
humanize.Comma(int64(numErrors)),
dur.Truncate(time.Millisecond),
humanize.Comma(int64(1000.0/float64(dur/time.Millisecond)*float64(numIndexed))),
)
} else {
log.Printf(
"Successfuly indexed [%s] documents in %s (%s docs/sec)",
humanize.Comma(int64(numIndexed)),
dur.Truncate(time.Millisecond),
humanize.Comma(int64(1000.0/float64(dur/time.Millisecond)*float64(numIndexed))),
)
}
}
count
和batch
為可選參數(shù)盅惜,在執(zhí)行時可以自定義:
? go run main.go -count=100000 -batch=25000
Bulk: documents [100,000] batch size [25,000]
▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁
→ Generated 100,000 articles
→ Sending batch
[1/4] [2/4] [3/4] [4/4]
▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔
Successfuly indexed [100,000] documents in 2.79s (35,842 docs/sec)
2 索引器
此例演示使用esutil.BulkIndexer
幫助程序來索引文檔中剩。
package main
import (
"github.com/dustin/go-humanize"
"github.com/cenkalti/backoff/v4"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
"github.com/elastic/go-elasticsearch/v8/esutil"
)
type Article struct {
ID int `json:"id"`
Title string `json:"title"`
Body string `json:"body"`
Published time.Time `json:"published"`
Author Author `json:"author"`
}
type Author struct {
FirstName string `json:"first_name"`
LastName string `json:"last_name"`
}
var (
indexName string
numWorkers int
flushBytes int
numItems int
)
func init() {
flag.StringVar(&indexName, "index", "test-bulk-example", "索引名稱")
flag.IntVar(&numWorkers, "workers", runtime.NumCPU(), "工作進(jìn)程數(shù)量")
flag.IntVar(&flushBytes, "flush", 5e+6, "以字節(jié)為單位的清除閾值")
flag.IntVar(&numItems, "count", 10000, "生成的文檔數(shù)量")
flag.Parse()
rand.Seed(time.Now().UnixNano())
}
func main() {
log.SetFlags(0)
var (
articles []*Article
countSuccessful uint64
res *esapi.Response
err error
)
log.Printf(
"\x1b[1mBulkIndexer\x1b[0m: documents [%s] workers [%d] flush [%s]",
humanize.Comma(int64(numItems)), numWorkers, humanize.Bytes(uint64(flushBytes)))
log.Println(strings.Repeat("▁", 65))
// 使用第三方包來實現(xiàn)回退功能
retryBackoff := backoff.NewExponentialBackOff()
// 創(chuàng)建客戶端。如果想使用最佳性能抒寂,請考慮使用第三方 http 包结啼,benchmarks 示例中有寫
es, err := elasticsearch.NewClient(elasticsearch.Config{
// 429 太多請求
RetryOnStatus: []int{502, 503, 504, 429},
// 配置回退函數(shù)
RetryBackoff: func(attempt int) time.Duration {
if attempt == 1 {
retryBackoff.Reset()
}
return retryBackoff.NextBackOff()
},
// 最多重試 5 次
MaxRetries: 5,
})
if err != nil {
log.Fatalf("Error creating the client: %s", err)
}
// 創(chuàng)建批量索引器。要使用最佳性能屈芜,考慮使用第三方 json 包郊愧,benchmarks 示例中有寫
bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Index: indexName, // 默認(rèn)索引名
Client: es, // es 客戶端
NumWorkers: numWorkers, // 工作進(jìn)程數(shù)量
FlushBytes: int(flushBytes), // 清除上限
FlushInterval: 30 * time.Second, // 定期清除間隔
})
if err != nil {
log.Fatalf("Error creating the indexer: %s", err)
}
// 生成文章
names := []string{"Alice", "John", "Mary"}
for i := 1; i <= numItems; i++ {
articles = append(articles, &Article{
ID: i,
Title: strings.Join([]string{"Title", strconv.Itoa(i)}, " "),
Body: "Lorem ipsum dolor sit amet...",
Published: time.Now().Round(time.Second).UTC().AddDate(0, 0, i),
Author: Author{
FirstName: names[rand.Intn(len(names))],
LastName: "Smith",
},
})
}
log.Printf("→ Generated %s articles", humanize.Comma(int64(len(articles))))
// 重新創(chuàng)建索引
if res, err = es.Indices.Delete([]string{indexName}, es.Indices.Delete.WithIgnoreUnavailable(true)); err != nil || res.IsError() {
log.Fatalf("Cannot delete index: %s", err)
}
res.Body.Close()
res, err = es.Indices.Create(indexName)
if err != nil {
log.Fatalf("Cannot create index: %s", err)
}
if res.IsError() {
log.Fatalf("Cannot create index: %s", res)
}
res.Body.Close()
start := time.Now().UTC()
// 循環(huán)收集
for _, a := range articles {
// 準(zhǔn)備 data:序列化的 article
data, err := json.Marshal(a)
if err != nil {
log.Fatalf("Cannot encode article %d: %s", a.ID, err)
}
// 添加 item 到 BulkIndexer
err = bi.Add(
context.Background(),
esutil.BulkIndexerItem{
// Action 字段配置要執(zhí)行的操作(索引、創(chuàng)建井佑、刪除属铁、更新)
Action: "index",
// DocumentID 是文檔 ID(可選)
DocumentID: strconv.Itoa(a.ID),
// Body 是 有效載荷的 io.Reader
Body: bytes.NewReader(data),
// OnSuccess 在每一個成功的操作后調(diào)用
OnSuccess: func(c context.Context, bii esutil.BulkIndexerItem, biri esutil.BulkIndexerResponseItem) {
atomic.AddUint64(&countSuccessful, 1)
},
// OnFailure 在每一個失敗的操作后調(diào)用
OnFailure: func(c context.Context, bii esutil.BulkIndexerItem, biri esutil.BulkIndexerResponseItem, e error) {
if err != nil {
log.Printf("ERROR: %s", err)
} else {
log.Printf("ERROR: %s: %s", biri.Error.Type, biri.Error.Reason)
}
},
},
)
if err != nil {
log.Fatalf("Unexpected error: %s", err)
}
}
// 關(guān)閉索引器
if err := bi.Close(context.Background()); err != nil {
log.Fatalf("Unexpected error: %s", err)
}
biStats := bi.Stats()
// 報告結(jié)果:索引成功的文檔的數(shù)量,錯誤的數(shù)量躬翁,耗時焦蘑,索引速度
log.Println(strings.Repeat("▔", 65))
dur := time.Since(start)
if biStats.NumFailed > 0 {
log.Fatalf(
"Indexed [%s] documents with [%s] errors in %s (%s docs/sec)",
humanize.Comma(int64(biStats.NumFlushed)),
humanize.Comma(int64(biStats.NumFailed)),
dur.Truncate(time.Millisecond),
humanize.Comma(int64(1000.0/float64(dur/time.Millisecond)*float64(biStats.NumFlushed))),
)
} else {
log.Printf(
"Sucessfuly indexed [%s] documents in %s (%s docs/sec)",
humanize.Comma(int64(biStats.NumFlushed)),
dur.Truncate(time.Millisecond),
humanize.Comma(int64(1000.0/float64(dur/time.Millisecond)*float64(biStats.NumFlushed))),
)
}
}
四個可選參數(shù),見init()
函數(shù):
? bulk go run indexer.go --workers=8 --count=100000 --flush=1000000
BulkIndexer: documents [100,000] workers [8] flush [1.0 MB]
▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁
→ Generated 100,000 articles
▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔
Sucessfuly indexed [100,000] documents in 1.584s (63,131 docs/sec)
五盒发、編碼
本示例中演示了如何使用 helper 方法和第三方 json 庫例嘱。
1 第三方 json 庫
1.1 tidwall/gjson
github.com/tidwall/gjson
庫允許在不將有效載荷轉(zhuǎn)換為數(shù)據(jù)結(jié)構(gòu)的前提下輕松訪問屬性。
package main
import (
"github.com/elastic/go-elasticsearch/v8"
"github.com/fatih/color"
"github.com/tidwall/gjson"
)
var (
faint = color.New(color.Faint)
bold = color.New(color.Bold)
)
func init() {
log.SetFlags(0)
}
func main() {
es, err := elasticsearch.NewDefaultClient()
if err != nil {
log.Fatalf("Error creating client: %s", err)
}
res, err := es.Cluster.Stats(es.Cluster.Stats.WithHuman())
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
json := read(res.Body)
fmt.Println(strings.Repeat("-", 50))
faint.Print("cluster ")
// 獲取集群名
bold.Print(gjson.Get(json, "cluster_name"))
faint.Print(" status=")
// 獲取集群健康狀態(tài)
status := gjson.Get(json, "status")
switch status.Str {
case "green":
bold.Add(color.FgHiGreen).Print(status)
case "yellow":
bold.Add(color.FgHiYellow).Print(status)
case "red":
bold.Add(color.FgHiRed).Print(status)
default:
bold.Add(color.FgHiRed, color.Underline).Print(status)
}
fmt.Println("\n" + strings.Repeat("-", 50))
stats := []string{
"indices.count",
"indices.docs.count",
"indices.store.size",
"nodes.count.total",
"nodes.os.mem.used_percent",
"nodes.process.cpu.percent",
"nodes.jvm.versions.#.version",
"nodes.jvm.mem.heap_used",
"nodes.jvm.mem.heap_max",
"nodes.fs.free",
}
var maxWidth int
for _, item := range stats {
if len(item) > maxWidth {
maxWidth = len(item)
}
}
for _, item := range stats {
pad := maxWidth - len(item)
fmt.Print(strings.Repeat(" ", pad))
faint.Printf("%s |", item)
// 從 json 動態(tài)獲取狀態(tài)
fmt.Printf(" %s\n", gjson.Get(json, item))
}
fmt.Println()
}
func read(r io.Reader) string {
var b bytes.Buffer
b.ReadFrom(r)
return b.String()
}
go run main.go
1.2 mailru/easyjson
mailru/easyjson
可以用提供的結(jié)構(gòu)體生成編碼和解碼的代碼宁舰。
示例項目結(jié)構(gòu)不適合在此處展示拼卵,故另寫一篇文章,請參閱:Elasticsearch 的 easyjson 示例
1.3 ES 中的 JSONReader
esutil.JSONReader()
方法將 struct蛮艰、map 或任何其他可序列化對象轉(zhuǎn)換為封裝在 reader 中的 JSON腋腮,然后將其傳遞給WithBody()
方法:
type Document struct{ Title string }
doc := Document{Title: "Test"}
es.Search(es.Search.WithBody(esutil.NewJSONReader(&doc)))
完整示例:
package main
import (
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
"github.com/elastic/go-elasticsearch/v8/esutil"
)
func init() {
log.SetFlags(0)
}
func main() {
var (
res *esapi.Response
err error
)
es, err := elasticsearch.NewDefaultClient()
if err != nil {
log.Fatalf("Error creating the client: %s", err)
}
doc := struct {
Title string `json:"title"`
}{Title: "Test"}
res, err = es.Index("test", esutil.NewJSONReader(&doc), es.Index.WithRefresh("true"))
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
log.Println(res)
query := map[string]interface{}{
"query": map[string]interface{}{
"match": map[string]interface{}{
"title": "test",
},
},
}
res, err = es.Search(
es.Search.WithIndex("test"),
es.Search.WithBody(esutil.NewJSONReader(&query)),
es.Search.WithPretty(),
)
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
log.Println(res)
}
運行結(jié)果:
[201 Created] {"_index":"test","_type":"_doc","_id":"4l7aG3kBuWpKCVVn78cc","_version":1,"result":"created","forced_refresh":true,"_shards":{"total":2,"successful":1,"failed":0},"_seq_no":28,"_primary_term":4}
[200 OK] {
"took" : 18,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 2.3671236,
"hits" : [
{
"_index" : "test",
"_type" : "_doc",
"_id" : "4l7aG3kBuWpKCVVn78cc",
"_score" : 2.3671236,
"_source" : {
"title" : "Test"
}
}
]
}
}
1.4 幾個 json 庫的基準(zhǔn)測試結(jié)果
BenchmarkEncode/Article_-_json-8 671839 1677 ns/op 768 B/op 5 allocs/op
BenchmarkEncode/Article_-_JSONReader-8 712545 1685 ns/op 824 B/op 7 allocs/op
BenchmarkEncode/Article_-_easyjson-8 1503753 802.0 ns/op 760 B/op 6 allocs/op
BenchmarkEncode/map_-_json-8 934605 1279 ns/op 672 B/op 18 allocs/op
BenchmarkEncode/map_-_JSONReader-8 824247 1421 ns/op 728 B/op 20 allocs/op
BenchmarkDecode/Search_-_json-8 46322 25893 ns/op 9258 B/op 75 allocs/op
BenchmarkDecode/Search_-_easyjson-8 103856 11344 ns/op 12635 B/op 70 allocs/op
BenchmarkDecode/Cluster_-_json_-_map-8 23635 50752 ns/op 31603 B/op 385 allocs/op
BenchmarkDecode/Cluster_-_json_-_stc-8 38974 30788 ns/op 15160 B/op 20 allocs/op
BenchmarkDecode/Cluster_-_gjson-8 909138 1354 ns/op 256 B/op 3 allocs/op
mailru/easyjson
和 tidwall/gjson
在不同場景下都比標(biāo)準(zhǔn)庫有更好的性能。
六、第三方 http 庫
本例演示如何使用fasthttp
替換默認(rèn)的net/http
低葫,并測試二者的性能详羡。
1 示例代碼
package http
import (
"io/ioutil"
"net/http"
"strings"
"github.com/valyala/fasthttp"
)
// Transport 用 fasthttp 實現(xiàn) es 接口
type Transport struct{}
// RoundTrip 發(fā)送請求仍律,返回響應(yīng)或錯誤
func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) {
freq := fasthttp.AcquireRequest()
defer fasthttp.ReleaseRequest(freq)
fres := fasthttp.AcquireResponse()
defer fasthttp.ReleaseResponse(fres)
t.copyRequest(freq, req)
err := fasthttp.Do(freq, fres)
if err != nil {
return nil, err
}
res := &http.Response{Header: make(http.Header)}
t.copyResponse(res, fres)
return res, nil
}
// copyRequest 將 http 請求轉(zhuǎn)換為 fasthttp 請求
func (t *Transport) copyRequest(dst *fasthttp.Request, src *http.Request) *fasthttp.Request {
if src.Method == http.MethodGet && src.Body != nil {
src.Method = http.MethodPost
}
dst.SetHost(src.Host)
dst.SetRequestURI(src.URL.String())
dst.Header.SetRequestURI(src.URL.String())
dst.Header.SetMethod(src.Method)
for k, vs := range src.Header {
for _, v := range vs {
dst.Header.Set(k, v)
}
}
if src.Body != nil {
dst.SetBodyStream(src.Body, -1)
}
return dst
}
// copyResponse 將 fasthttp 響應(yīng)轉(zhuǎn)換為 http 響應(yīng)
func (t *Transport) copyResponse(dst *http.Response, src *fasthttp.Response) *http.Response {
dst.StatusCode = src.StatusCode()
src.Header.VisitAll(func(key, value []byte) {
dst.Header.Set(string(key), string(value))
})
// 在響應(yīng)被釋放回響應(yīng)池(fasthttp.ReleaseResponse)后嘿悬,將 src.Body() 轉(zhuǎn)換為字符串 Reader
dst.Body = ioutil.NopCloser(strings.NewReader(string(src.Body())))
return dst
}
2 基準(zhǔn)測試
測試代碼:
package http_test
import (
"elasticsearch/fasthttp/http"
"testing"
"github.com/elastic/go-elasticsearch/v8"
)
func BenchmarkHTTPClient(b *testing.B) {
b.ReportAllocs()
client, err := elasticsearch.NewDefaultClient()
if err != nil {
b.Fatalf("ERROR: %s", err)
}
b.Run("Info()", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
if res, err := client.Info(); err != nil {
b.Errorf("Unexpected error when getting a response: %s", err)
} else {
res.Body.Close()
}
}
})
}
func BenchmarkFastHTTPClient(b *testing.B) {
b.ReportAllocs()
client, err := elasticsearch.NewClient(
elasticsearch.Config{Transport: &http.Transport{}},
)
if err != nil {
b.Fatalf("ERROR: %s", err)
}
b.Run("Info()", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
if res, err := client.Info(); err != nil {
b.Errorf("Unexpected error when getting a response: %s", err)
} else {
res.Body.Close()
}
}
})
}
結(jié)果:
...
BenchmarkHTTPClient/Info()-8 6067 2438072 ns/op 15908 B/op 120 allocs/op
...
BenchmarkFastHTTPClient/Info()-8 14690 811282 ns/op 2325 B/op 27 allocs/op
http 標(biāo)準(zhǔn)庫的性能在慢慢提高,但是仍然與 fasthttp 有著不小的差距水泉。
七善涨、檢測儀
此示例演示了如何檢測 es 客戶端。
1 OpenCensus
使用 ochttp.Transport
自動檢測客戶端調(diào)用草则,并將有關(guān)信息打印到終端钢拧。
package main
import (
"github.com/elastic/go-elasticsearch/v8"
"github.com/fatih/color"
"go.opencensus.io/plugin/ochttp"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opencensus.io/trace"
"golang.org/x/crypto/ssh/terminal"
)
const count = 100
var (
tWidth, _, _ = terminal.GetSize(int(os.Stdout.Fd()))
faint = color.New(color.Faint)
bold = color.New(color.Bold)
boldRed = color.New(color.FgRed).Add(color.Bold)
tagMethod, _ = tag.NewKey("method")
)
func init() {
if tWidth < 0 {
tWidth = 0
}
}
// ConsoleExporter 將狀態(tài)和追蹤軌跡打印到終端
type ConsoleExporter struct{}
// ExportView 打印狀態(tài)
func (e *ConsoleExporter) ExportView(vd *view.Data) {
fmt.Println(strings.Repeat("─", tWidth))
for _, row := range vd.Rows {
faint.Print("█ ")
fmt.Printf("%-17s ", strings.TrimPrefix(vd.View.Name, "opencensus.io/http/client/"))
switch v := row.Data.(type) {
case *view.DistributionData:
bold.Printf("min=%-6.1f max=%-6.1f mean=%-6.1f", v.Min, v.Max, v.Mean)
case *view.CountData:
bold.Printf("count=%-3v", v.Value)
case *view.SumData:
bold.Printf("sum=%-3v", v.Value)
case *view.LastValueData:
bold.Printf("last=%-3v", v.Value)
}
faint.Print(" │ ")
for _, tag := range row.Tags {
faint.Printf("%-25s ", fmt.Sprintf("%v=%v", tag.Key.Name(), tag.Value))
}
fmt.Println()
}
}
// ExportSpan 打印追蹤軌跡
func (e *ConsoleExporter) ExportSpan(sd *trace.SpanData) {
var c *color.Color
if sd.Status.Code > 0 {
c = color.New(color.FgRed)
} else {
c = color.New(color.FgGreen)
}
fmt.Println(strings.Repeat("─", tWidth))
fmt.Printf(
"? %s %s %s\n",
c.Sprint(sd.Status.Message),
bold.Sprint(sd.Name),
sd.EndTime.Sub(sd.StartTime).Round(time.Millisecond))
faint.Printf("? %x > %x\n", sd.SpanContext.TraceID[:], sd.SpanContext.SpanID[:])
if len(sd.Attributes) > 0 {
faint.Print("? ")
var keys []string
for k := range sd.Attributes {
keys = append(keys, k)
}
sort.Strings(keys)
for i, k := range keys {
faint.Printf("%s=%v", k, sd.Attributes[k])
if i < len(keys)-1 {
faint.Printf(" │ ")
}
}
}
fmt.Println()
}
func main() {
log.SetFlags(0)
start := time.Now()
// Create new elasticsearch client ...
//
es, err := elasticsearch.NewClient(
elasticsearch.Config{
// >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
// ... 使用"ochttp" 封裝檢測儀
Transport: &ochttp.Transport{},
// <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
})
if err != nil {
log.Fatalf("ERROR: %s", err)
}
// 創(chuàng)建 done 管道
//
done := make(chan os.Signal)
signal.Notify(done, os.Interrupt)
// 創(chuàng)建刻點
//
tickers := struct {
Info *time.Ticker
Index *time.Ticker
Health *time.Ticker
Search *time.Ticker
}{
Info: time.NewTicker(time.Second),
Index: time.NewTicker(500 * time.Millisecond),
Health: time.NewTicker(5 * time.Second),
Search: time.NewTicker(10 * time.Second),
}
defer tickers.Info.Stop()
defer tickers.Index.Stop()
defer tickers.Health.Stop()
defer tickers.Search.Stop()
// 使用 ochttp 插件注冊視圖
//
if err := view.Register(
ochttp.ClientRoundtripLatencyDistribution,
ochttp.ClientCompletedCount,
); err != nil {
log.Fatalf("ERROR: %s", err)
}
// 一段時間向 STDOUT 報告一次視圖
//
view.SetReportingPeriod(5 * time.Second)
view.RegisterExporter(&ConsoleExporter{})
// 向 STDOUT 報告一部分軌跡
//
trace.ApplyConfig(trace.Config{DefaultSampler: trace.ProbabilitySampler(0.5)})
trace.RegisterExporter(&ConsoleExporter{})
// 初始化上下文
//
ctx, _ := tag.New(context.Background(), tag.Upsert(tagMethod, "main"))
// 調(diào)用 api
//
for {
select {
case <-done:
fmt.Print("\n")
fmt.Println(strings.Repeat("━", tWidth))
faint.Printf("Finished in %s\n\n", time.Now().Sub(start).Truncate(time.Second))
return
// -> Info
//
case <-tickers.Info.C:
res, err := es.Info(es.Info.WithContext(ctx))
if err != nil {
boldRed.Printf("Error getting response: %s\n", err)
} else {
res.Body.Close()
}
// -> Index
//
case t := <-tickers.Index.C:
// Artificially fail some requests...
var body io.Reader
if t.Second()%4 == 0 {
body = strings.NewReader(``)
} else {
body = strings.NewReader(`{"timestamp":"` + t.Format(time.RFC3339) + `"}`)
}
res, err := es.Index("test", body, es.Index.WithContext(ctx))
if err != nil {
boldRed.Printf("Error getting response: %s\n", err)
} else {
res.Body.Close()
}
// -> Health
//
case <-tickers.Health.C:
res, err := es.Cluster.Health(
es.Cluster.Health.WithLevel("indices"),
es.Cluster.Health.WithContext(ctx),
)
if err != nil {
boldRed.Printf("Error getting response: %s\n", err)
} else {
res.Body.Close()
}
// -> Search
//
case <-tickers.Search.C:
res, err := es.Search(
es.Search.WithIndex("test"),
es.Search.WithSort("timestamp:desc"),
es.Search.WithSize(1),
es.Search.WithContext(ctx),
)
if err != nil {
boldRed.Printf("Error getting response: %s\n", err)
} else {
res.Body.Close()
}
}
}
}
2 Elastic APM
使用Go agent for Elastic APM檢測客戶端:
- 配置多種類型的事務(wù)
- 在每個事務(wù)中創(chuàng)建自定義跨度
- 報告錯誤
使用 docker 示例。
2.1 docker-compose 配置文件
文件名:elasticstack.yml炕横。
version: "3.2"
services:
# --- Application -----------------------------------------------------------
application:
container_name: application
build: .
networks: ["elasticstack"]
depends_on:
- elasticsearch
- kibana
- apm-server
# --- Elasticsearch ---------------------------------------------------------
elasticsearch:
image: elasticsearch:7.12.1
container_name: elasticsearch
volumes:
- es_data:/usr/share/elasticsearch/data:delegated
networks: ["elasticstack"]
environment:
- "cluster.name=go-elasticsearch-instrumentation"
- "cluster.routing.allocation.disk.threshold_enabled=false"
- "discovery.type=single-node"
- "bootstrap.memory_lock=true"
- "xpack.security.enabled=false"
- "ES_JAVA_OPTS=-Xms1g -Xmx1g"
expose:
- "9200"
ulimits:
memlock: -1
nproc: 65535
nofile: 65535
healthcheck:
test: curl --max-time 60 --retry 60 --retry-delay 1 --retry-connrefused --show-error --silent http://localhost:9200
# --- Kibana ----------------------------------------------------------------
kibana:
image: kibana:7.12.1
container_name: kibana
networks: ["elasticstack"]
environment:
- "ELASTICSEARCH_URL=http://elasticsearch:9200"
ports:
- "5601:5601"
depends_on: ["elasticsearch"]
healthcheck:
test: curl --max-time 60 --retry 60 --retry-delay 1 --retry-connrefused --show-error --silent http://localhost:5601
# --- APM Server ------------------------------------------------------------
apm-server:
image: elastic/apm-server:7.12.1
container_name: apm_server
networks: ["elasticstack"]
command: >
./apm-server run -e \
-E output.elasticsearch.hosts=http://elasticsearch:9200 \
-E setup.kibana.host=http://kibana:5601
expose:
- "8200"
depends_on: ["elasticsearch", "kibana"]
healthcheck:
test: curl --max-time 60 --retry 60 --retry-delay 1 --retry-connrefused --show-error --silent http://localhost:8200/healthcheck
networks:
elasticstack:
volumes:
es_data:
2.2 Dockerfile
FROM golang:1.16.3
RUN echo 'deb http://mirrors.aliyun.com/debian/ buster main non-free contrib\n\
deb http://mirrors.aliyun.com/debian-security buster/updates main\n\
deb http://mirrors.aliyun.com/debian/ buster-updates main non-free contrib\n\
deb http://mirrors.aliyun.com/debian/ buster-backports main non-free contrib\n'\
> /etc/apt/sources.list
RUN apt-get update && apt-get install -y gcc g++ ca-certificates make curl git jq
WORKDIR /go-elasticsearch-demo-instrumentation
RUN go env -w GO111MODULE=on && go env -w GOPROXY=https://goproxy.cn,direct
COPY go.mod .
RUN go mod download
ENV TERM xterm-256color
ENV ELASTICSEARCH_URL=${ELASTICSEARCH_URL:-http://elasticsearch:9200}
ENV ELASTIC_APM_SERVER_URL=${ELASTIC_APM_SERVER_URL:-http://apm_server:8200}
ENV ELASTIC_APM_SERVICE_NAME=go-elasticsearch-demo-instrumentation
ENV ELASTIC_APM_METRICS_INTERVAL=5s
ENV ELASTIC_APM_LOG_FILE=stderr
ENV ELASTIC_APM_LOG_LEVEL=debug
COPY apmelasticsearch.go opencensus.go ./
CMD go run apmelasticsearch.go
2.3 運行
elasticstack.yml 和 Dockfile 放在同一目錄源内。
運行:
docker-compose --file elasticstack.yml up --build
刪除示例 docker 文件:
docker-compose --file elasticstack.yml down --remove-orphans --volumes
八、擴(kuò)展
此例演示如何擴(kuò)展客戶端份殿,以便調(diào)用自定義 API膜钓。
main.go
示例中定義了嵌入到常規(guī)客戶端的自定義類型,添加一個實現(xiàn)了Example()
方法的Custom
命名空間卿嘲。
package main
import (
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
"github.com/elastic/go-elasticsearch/v8/estransport"
)
const port = "9209"
// ExtendedClient 包括常規(guī) api 和自定義 api
type ExtendedClient struct {
*elasticsearch.Client
Custom *ExtendedAPI
}
// ExtendedAPI 自定義 api
type ExtendedAPI struct {
*elasticsearch.Client
}
// Example 調(diào)用自定義 restful api颂斜,"GET /_cat/example"
func (e *ExtendedAPI) Example() (*esapi.Response, error) {
req, _ := http.NewRequest("GET", "/_cat/example", nil)
res, err := e.Perform(req)
if err != nil {
return nil, err
}
return &esapi.Response{StatusCode: res.StatusCode, Body: res.Body, Header: res.Header}, nil
}
func main() {
log.SetFlags(0)
started := make(chan bool)
// 啟動代理服務(wù)
go startServer(started)
ec, err := elasticsearch.NewClient(elasticsearch.Config{
Addresses: []string{"http://localhost:" + port},
Logger: &estransport.ColorLogger{Output: os.Stdout, EnableRequestBody: true, EnableResponseBody: true},
})
if err != nil {
log.Fatalf("Error creating the client: %s", err)
}
es := ExtendedClient{
Client: ec,
Custom: &ExtendedAPI{ec},
}
<-started
// 調(diào)用常規(guī) api
es.Cat.Health()
// 調(diào)用自定義 api
es.Custom.Example()
}
func startServer(started chan<- bool) {
proxy := httputil.NewSingleHostReverseProxy(&url.URL{Scheme: "http", Host: "localhost:9200"})
// 在"GET /_cat/example"上以自定義內(nèi)容響應(yīng),將其他請求代理到 es
//
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" && r.URL.Path == "/_cat/example" {
io.WriteString(w, "Hello from Cat Example action")
return
}
proxy.ServeHTTP(w, r)
})
ln, err := net.Listen("tcp", "localhost:"+port)
if err != nil {
log.Fatalf("Unable to start server: %s", err)
}
go http.Serve(ln, nil)
started <- true
}
結(jié)果:
九拾枣、安全
本例演示如何通過用自定義證書使用 TLS (傳輸層安全) 來加密和驗證與 ES 集群的通信沃疮。
1 為集群創(chuàng)建證書
創(chuàng)建證書的命令如下:
OUTPUT="/certificates/bundle.zip"
if [[ -f $$OUTPUT ]]; then
echo "Certificates already present in [.$$OUTPUT]"; exit 1;
else
yum install -y -q -e 0 unzip tree;
bin/elasticsearch-certutil cert \
--pem \
--days 365 \
--keep-ca-key \
--in config/certificates/certificates-config.yml \
--out $$OUTPUT;
unzip -q $$OUTPUT -d /certificates;
chown -R 1000:0 /certificates; echo;
tree /certificates;
fi;
命令中用到的certificates-config.yml
內(nèi)容如下:
instances:
- name: elasticsearch
ip: [0.0.0.0, 127.0.0.1]
dns: ["localhost", "example_elasticsearch_1", "example_elasticsearch_2", "example_elasticsearch_3"]
既然是示例,創(chuàng)建一個即用即毀的環(huán)境是很有必要的梅肤,所以使用一個 docker 容器運行上面的命令是最好的選擇司蔬。
docker-compose 文件:
version: "3.7"
services:
create_certificates:
image: elasticsearch:7.12.1
container_name: certificates_generator
user: root
working_dir: /usr/share/elasticsearch
command: >
bash -c '
OUTPUT="/certificates/bundle.zip"
if [[ -f $$OUTPUT ]]; then
echo "Certificates already present in [.$$OUTPUT]"; exit 1;
else
yum install -y -q -e 0 unzip tree;
bin/elasticsearch-certutil cert \
--pem \
--days 365 \
--keep-ca-key \
--in config/certificates/certificates-config.yml \
--out $$OUTPUT;
unzip -q $$OUTPUT -d /certificates;
chown -R 1000:0 /certificates; echo;
tree /certificates;
fi;
'
volumes:
- ./certificates:/certificates
- ./certificates-config.yml:/usr/share/elasticsearch/config/certificates/certificates-config.yml
運行 docker 容器:
docker-compose --file certificates-create.yml run --rm create_certificates
運行完成后,當(dāng)前目錄中就會生成一個包含證書的certificates
文件夾姨蝴。
現(xiàn)在有了證書葱她,下面就可以創(chuàng)建一個開啟安全認(rèn)證的集群。
docker-compose 文件:
version: "3.7"
services:
elasticsearch:
image: elasticsearch:${ELASTIC_VERSION}
volumes:
- es-data:/usr/share/elasticsearch/data
- ./certificates:/usr/share/elasticsearch/config/certificates/
networks:
- elasticstack
ports:
- 9200:9200
environment:
- node.name=example_elasticsearch_1
- cluster.name=golang-example-security
- cluster.initial_master_nodes=example_elasticsearch_1
- discovery.seed_hosts=example_elasticsearch_1
- bootstrap.memory_lock=true
- network.host=example_elasticsearch_1,_local_
- network.publish_host=example_elasticsearch_1
- ES_JAVA_OPTS=-Xms1G -Xmx1G -Des.transport.cname_in_publish_address=true
# Security & TLS
- ELASTIC_PASSWORD=${ELASTIC_PASSWORD}
- xpack.security.enabled=true
- xpack.security.http.ssl.enabled=true
- xpack.security.http.ssl.key=/usr/share/elasticsearch/config/certificates/elasticsearch/elasticsearch.key
- xpack.security.http.ssl.certificate=/usr/share/elasticsearch/config/certificates/elasticsearch/elasticsearch.crt
- xpack.security.http.ssl.certificate_authorities=/usr/share/elasticsearch/config/certificates/ca/ca.crt
- xpack.security.transport.ssl.enabled=true
- xpack.security.transport.ssl.verification_mode=certificate
- xpack.security.transport.ssl.key=/usr/share/elasticsearch/config/certificates/elasticsearch/elasticsearch.key
- xpack.security.transport.ssl.certificate=/usr/share/elasticsearch/config/certificates/elasticsearch/elasticsearch.crt
- xpack.security.transport.ssl.certificate_authorities=/usr/share/elasticsearch/config/certificates/ca/ca.crt
ulimits: { nofile: { soft: 262144, hard: 262144 }, memlock: -1 }
healthcheck:
test: curl --cacert /usr/share/elasticsearch/config/certificates/ca/ca.crt --max-time 120 --retry 120 --retry-delay 1 --show-error --silent https://elastic:${ELASTIC_PASSWORD}@localhost:9200
networks:
elasticstack: { labels: { elasticstack.description: "Network for the Elastic Stack" }}
volumes:
es-data: { labels: { elasticstack.description: "Elasticsearch data" }}
運行集群:
docker-compose --file elasticsearch-cluster.yml up --remove-orphans --detach
用證書訪問配合賬號密碼訪問:
curl --cacert certificates/ca/ca.crt https://elastic:elastic@localhost:9200
會得到正確響應(yīng)似扔。
2 使用 Go 客戶端安全訪問
2.1 使用客戶端配置中的證書字段
先從文件中讀取證書內(nèi)容吨些,然后將證書放到客戶端配置中:
// --> 從文件中讀取證書
cert, _ := ioutil.ReadFile(*cacert)
es, _ := elasticsearch.NewClient(
elasticsearch.Config{
// ...
// --> 將證書放到配置中
CACert: cert,
})
2.2 根據(jù)證書創(chuàng)建自定義傳輸
cert, _ := ioutil.ReadFile(*cacert)
// 復(fù)制默認(rèn)傳輸
tp := http.DefaultTransport.(*http.Transport).Clone()
// 初始化一個根證書頒發(fā)機(jī)構(gòu)
tp.TLSClientConfig.RootCAs, _ = x509.SystemCertPool()
// 添加自定義證書頒發(fā)機(jī)構(gòu)
tp.TLSClientConfig.RootCAs.AppendCertsFromPEM(cert)
es, _ := elasticsearch.NewClient(
elasticsearch.Config{
Addresses: []string{"https://localhost:9200"},
Username: "elastic",
Password: *password,
// --> 將自定義傳輸添加到客戶端配置中
//
Transport: tp,
},
)
十、示例應(yīng)用
爬取掘金熱門推薦的頁面的信息保存到 es 中炒辉,并進(jìn)行查詢豪墅。
示例項目地址:thep0y/juejin-hot-es-example
查詢時使用命令行進(jìn)行,示例項目的命令如下:
juejin allows you to index and search hot-recommended article's titles
Usage:
juejin [command]
Available Commands:
help Help about any command
index Index juejin hot-recommended articles into Elasticsearch
search Search juejin hot recommended articles
Flags:
-h, --help help for juejin
-i, --index string Index name (default "juejin")
Use "juejin [command] --help" for more information about a command.
可選參數(shù)為index
和search
黔寇。
其中index
也有可選命令:
--pages int The count of pages you want to crawl (default 5)
--setup Create Elasticsearch index
本項目使用的是本地 es 偶器,推薦用 docker 創(chuàng)建,es 中需要安裝 ik 中文分詞插件。
1 創(chuàng)建索引
go run main.go index --setup
默認(rèn)會根據(jù)項目中指定的 mapping 創(chuàng)建索引屏轰,并爬取存儲 5 頁颊郎、共 100 條信息。
結(jié)果如下所示:
8:10PM INF Creating index with mapping
8:10PM INF Starting the crawl with 0 workers at 0 offset
8:10PM INF Stored doc Article ID=6957974706943164447 title="算法篇01霎苗、排序算法"
8:10PM INF Stored doc Article ID=6953868764362309639 title="如何處理瀏覽器的斷網(wǎng)情況姆吭?"
...
8:10PM INF Skipping existing doc ID=6957726578692341791
8:10PM INF Skipping existing doc ID=6957925118429364255
8:10PM INF Skipping existing doc ID=6953868764362309639
8:10PM INF Skipping existing doc ID=6957981912669519903
8:10PM INF Skipping existing doc ID=6953059119561441287
8:10PM INF Skipping existing doc ID=6955336007839383588
...
8:10PM INF Stored doc Article ID=6957930535574306847 title="Node系列-阻塞和非阻塞的理解"
8:10PM INF Stored doc Article ID=6956602138201948196 title="《前端領(lǐng)域的轉(zhuǎn)譯打包工具鏈》上篇"
8:10PM INF Stored doc Article ID=6957982556885090312 title="JS篇:事件流"
終端結(jié)果截圖:
因為每頁有 20 條,共爬 5 頁唁盏,所以理論上應(yīng)存儲 100 條信息内狸,但其中可能會存在幾條重復(fù)信息,所以最后保存時可能會小于 100 條厘擂。
2 爬取 10 頁
go run main.go index --pages 10
運行這條命令時昆淡,不會再創(chuàng)建索引,而是直接開始爬蟲刽严,因為只是示例項目昂灵,所以沒有增加起始頁和最終頁的選擇,只提供最終頁碼作為可選參數(shù)舞萄。
運行結(jié)果與上小節(jié)基本相同:
3 查詢
查詢時眨补,使用的是詞組查詢,中文更適合使用詞組查詢鹏氧,不然每個查詢詞被拆分成單字查詢渤涌,結(jié)果一般不是我們想要的。
go run main.go search 前端
查詢到的結(jié)果中會將查詢詞高亮顯示: