Elasticsearch Go 客戶端

本文代碼來自于官方示例。

一窗慎、配置

此例演示的是配置客戶端的傳輸Transport

類例中的配置主要用于說明其作用卤材,不適用于生產(chǎn)環(huán)境遮斥。

默認(rèn)的傳輸就夠用了。

package main

import (
    "github.com/elastic/go-elasticsearch/v8"
)

func main() {
    cfg := elasticsearch.Config{
        Addresses: []string{"http://localhost:9200"},
        Transport: &http.Transport{
            MaxIdleConnsPerHost:   10,
            ResponseHeaderTimeout: time.Millisecond,
            DialContext:           (&net.Dialer{Timeout: time.Nanosecond}).DialContext,
            TLSClientConfig: &tls.Config{
                MinVersion: tls.VersionTLS11,
            },
        },
    }

    es, err := elasticsearch.NewClient(cfg)
    if err != nil {
        panic(err)
    }

    resp, err := es.Info()
    if err != nil {
        panic(err)
    }
    fmt.Println(resp)
    // => panic: dial tcp: i/o timeout
}

二扇丛、自定義傳輸

自定義傳輸用于讀取或操作請求和響應(yīng)术吗,自定義日志記錄,將自定義 header 傳遞給請求等帆精。

CountingTransport將自定義 header 添加到請求较屿,記錄有關(guān)請求和響應(yīng)的信息,并統(tǒng)計請求次數(shù)卓练。

由于它實現(xiàn)了http.RoundTripper接口隘蝎,因此可以將其作為自定義 HTTP 傳輸實現(xiàn)傳遞給客戶端。

type CountingTransport struct {
    count uint64
}

// RoundTrip 發(fā)送一個請求襟企,返回一個響應(yīng)
func (t *CountingTransport) RoundTrip(req *http.Request) (*http.Response, error) {
    var buf bytes.Buffer

    atomic.AddUint64(&t.count, 1)

    req.Header.Set("Accept", "application/yaml")
    req.Header.Set("X-Request-ID", "foo-123")

    res, err := http.DefaultTransport.RoundTrip(req)

    buf.WriteString(strings.Repeat("-", 80) + "\n")
    fmt.Fprintf(&buf, "%s %s", req.Method, req.URL.String())

    if err != nil {
        fmt.Fprintf(&buf, "ERROR: %s\n", err)
    } else {
        fmt.Fprintf(&buf, "[%s] %s\n", res.Status, res.Header.Get("Content-Type"))
    }

    buf.WriteTo(os.Stdout)

    return res, err
}

func main() {
    var wg sync.WaitGroup

    // 創(chuàng)建一個自定義傳輸
    tp := new(CountingTransport)

    // 將自定義傳輸放到客戶端配置中
    es, _ := elasticsearch.NewClient(elasticsearch.Config{Transport: tp})

    for i := 0; i < 25; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            es.Info()
        }()
    }

    wg.Wait()

    fmt.Println(strings.Repeat("=", 80))
    fmt.Printf("%80s\n", fmt.Sprintf("Total Requests: %d", atomic.LoadUint64(&tp.count)))
}

三嘱么、日志

1 使用默認(rèn)日志

默認(rèn)日志有以下四種:

  • estransport.TextLogger
  • estransport.ColorLogger
  • estransport.CurlLogger
  • estransport.JSONLogger
func main() {
    log.SetFlags(0)

    var es *elasticsearch.Client

    es, _ = elasticsearch.NewClient(elasticsearch.Config{
        Logger: &estransport.TextLogger{Output: os.Stdout},
    })
    run(es, "Text")

    es, _ = elasticsearch.NewClient(elasticsearch.Config{
        Logger: &estransport.ColorLogger{Output: os.Stdout},
    })
    run(es, "Color")

    es, _ = elasticsearch.NewClient(elasticsearch.Config{
        Logger: &estransport.ColorLogger{
            Output:             os.Stdout,
            EnableRequestBody:  true,
            EnableResponseBody: true,
        },
    })
    run(es, "Request/Response body")

    es, _ = elasticsearch.NewClient(elasticsearch.Config{
        Logger: &estransport.CurlLogger{
            Output:             os.Stdout,
            EnableRequestBody:  true,
            EnableResponseBody: true,
        },
    })
    run(es, "Curl")

    es, _ = elasticsearch.NewClient(elasticsearch.Config{
        Logger: &estransport.JSONLogger{
            Output: os.Stdout,
        },
    })
    run(es, "JSON")
}

func run(es *elasticsearch.Client, name string) {
    log.Println("███", fmt.Sprintf("\x1b[1m%s\x1b[0m", name), strings.Repeat("█", 75-len(name)))

    es.Delete("test", "1")
    es.Exists("test", "1")

    es.Index(
        "test",
        strings.NewReader(`{"title": "logging"}`),
        es.Index.WithRefresh("true"),
        es.Index.WithPretty(),
        es.Index.WithFilterPath("result", "_id"),
    )

    es.Search(es.Search.WithQuery("{FAIL"))

    res, err := es.Search(
        es.Search.WithIndex("test"),
        es.Search.WithBody(strings.NewReader(`{"query": {"match": {"title": "logging"}}}`)),
        es.Search.WithSize(1),
        es.Search.WithPretty(),
        es.Search.WithFilterPath("took", "hits.hits"),
    )

    s := res.String()

    if len(s) <= len("[200 OK] ") {
        log.Fatal("Response body is empty")
    }

    if err != nil {
        log.Fatalf("Error: %s", err)
    }

    log.Println()
}

四種日志的用途:

  • TextLogger:將請求和響應(yīng)的基本信息以明文的形式輸出
  • ColorLogger:在開發(fā)時能在終端將一些信息以不同顏色輸出
  • CurlLogger:將信息格式化為可運行的curl命令,當(dāng)啟用EnableResponseBody時會美化輸出
  • JSONLogger:將信息以 json 格式輸出顽悼,適用于生產(chǎn)環(huán)境的日志

如果要記錄請求和響應(yīng)的 body 內(nèi)容曼振,需要開啟對應(yīng)的選項:

  • EnableRequestBody:記錄請求體
  • EnableResponseBody:記錄響應(yīng)體

2 自定義日志

根據(jù)estransport.Logger接口几迄,實現(xiàn)自定義日志。

日志包使用rs/zerolog冰评。

package main

import (
    "github.com/rs/zerolog"
)

// CustomLogger 實現(xiàn) estransport.Logger 接口
type CustomLogger struct {
    zerolog.Logger
}

// LogRoundTrip 打印請求和響應(yīng)的一些信息
func (l *CustomLogger) LogRoundTrip(
    req *http.Request,
    res *http.Response,
    err error,
    start time.Time,
    dur time.Duration,
) error {
    var (
        e    *zerolog.Event
        nReq int64
        nRes int64
    )

    // 設(shè)置日志等級
    switch {
    case err != nil:
        e = l.Error()
    case res != nil && res.StatusCode > 0 && res.StatusCode < 300:
        e = l.Info()
    case res != nil && res.StatusCode > 299 && res.StatusCode < 500:
        e = l.Warn()
    case res != nil && res.StatusCode > 499:
        e = l.Error()
    default:
        e = l.Error()
    }

    // 計算請求體和響應(yīng)體的字節(jié)數(shù)
    if req != nil && req.Body != nil && req.Body != http.NoBody {
        nReq, _ = io.Copy(ioutil.Discard, req.Body)
    }
    if res != nil && res.Body != nil && res.Body != http.NoBody {
        nRes, _ = io.Copy(ioutil.Discard, res.Body)
    }

    // 日志事件
    e.Str("method", req.Method).
        Int("status_code", res.StatusCode).
        Dur("duration", dur).
        Int64("req_bytes", nReq).
        Int64("res_bytes", nRes).
        Msg(req.URL.String())

    return nil
}

// RequestBodyEnabled 輸出請求體
func (l *CustomLogger) RequestBodyEnabled() bool { return true }

// ResponseBodyEnabled 輸出響應(yīng)體
func (l *CustomLogger) ResponseBodyEnabled() bool { return true }

func main() {

    // 設(shè)置日志
    log := zerolog.New(zerolog.ConsoleWriter{Out: os.Stderr}).
        Level(zerolog.InfoLevel).
        With().
        Timestamp().
        Logger()

    // 客戶端使用自定義的日志
    es, _ := elasticsearch.NewClient(elasticsearch.Config{
        Logger: &CustomLogger{log},
    })

    {
        es.Delete("test", "1")
        es.Exists("test", "1")
        es.Index("test", strings.NewReader(`{"title": "logging"}`), es.Index.WithRefresh("true"))

        es.Search(
            es.Search.WithQuery("{FAIL"),
        )

        es.Search(
            es.Search.WithIndex("test"),
            es.Search.WithBody(strings.NewReader(`{"query": {"match": {"title": "logging"}}}`)),
            es.Search.WithSize(1),
        )
    }
}

結(jié)果如圖:

截屏2021-04-25 20.31.31

四映胁、批量索引

1 默認(rèn)

此示例有意不使用任何抽象或輔助功能來展示使用 bulk api 的低級機(jī)制:準(zhǔn)備元數(shù)據(jù)有效載荷,批量發(fā)送有效載荷甲雅,檢查錯誤結(jié)果并打印報告解孙。

請看代碼注釋:

package main

import (
    "github.com/dustin/go-humanize"
    "github.com/elastic/go-elasticsearch/v8"
    "github.com/elastic/go-elasticsearch/v8/esapi"
)

type Article struct {
    ID        int       `json:"id"`
    Title     string    `json:"title"`
    Body      string    `json:"body"`
    Published time.Time `json:"published"`
    Author    Author    `json:"author"`
}

type Author struct {
    FirstName string `json:"first_name"`
    LastName string `json:"last_name"`
}

var (
    _     = fmt.Print
    count int
    batch int
)

func init() {
    flag.IntVar(&count, "count", 1000, "生成的文檔數(shù)量")
    flag.IntVar(&batch, "batch", 255, "每次發(fā)送的文檔數(shù)量")
    flag.Parse()

    rand.Seed(time.Now().UnixNano())
}

func main() {
    log.SetFlags(0)

    type bulkResponse struct {
        Errors bool `json:"errors"`
        Items  []struct {
            Index struct {
                ID     string `json:"_id"`
                Result string `json:"result"`
                Status int    `json:"status"`
                Error  struct {
                    Type   string `json:"type"`
                    Reason string `json:"reason"`
                    Cause  struct {
                        Type   string `json:"type"`
                        Reason string `json:"reason"`
                    } `json:"caused_by"`
                } `json:"error"`
            } `json:"index"`
        } `json:"items"`
    }

    var (
        buf bytes.Buffer
        res *esapi.Response
        err error
        raw map[string]interface{}
        blk *bulkResponse

        articles  []*Article
        indexName = "articles"

        numItems   int
        numErrors  int
        numIndexed int
        numBatches int
        currBatch  int
    )

    log.Printf(
        "\x1b[1mBulk\x1b[0m: documents [%s] batch size [%s]",
        humanize.Comma(int64(count)), humanize.Comma(int64(batch)))
    log.Println(strings.Repeat("_", 65))

    // 創(chuàng)建客戶端
    es, err := elasticsearch.NewDefaultClient()
    if err != nil {
        panic(err)
    }

    // 生成文章
    names := []string{"Alice", "John", "Mary"}
    for i := 1; i < count+1; i++ {
        articles = append(articles, &Article{
            ID:        i,
            Title:     strings.Join([]string{"Title", strconv.Itoa(i)}, " "),
            Body:      "Lorem ipsum dolor sit amet...",
            Published: time.Now().Round(time.Second).Local().AddDate(0, 0, i),
            Author: Author{
                FirstName: names[rand.Intn(len(names))],
                LastName: "Smith",
            },
        })
    }
    log.Printf("→ Generated %s articles", humanize.Comma(int64(len(articles))))
    fmt.Println("→ Sending batch ")

    // 重新創(chuàng)建索引
    if res, err = es.Indices.Delete([]string{indexName}); err != nil {
        log.Fatalf("Cannot delete index: %s", err)
    }

    res, err = es.Indices.Create(indexName)
    if err != nil {
        log.Fatalf("Cannot create index: %s", err)
    }

    if res.IsError() {
        log.Fatalf("Cannot create index: %s", res)
    }

    if count%batch == 0 {
        numBatches = count / batch
    } else {
        numBatches = count/batch + 1
    }

    start := time.Now().Local()

    // 循環(huán)收集
    for i, a := range articles {
        numItems++

        currBatch = i / batch
        if i == count-1 {
            currBatch++
        }

        // 準(zhǔn)備元數(shù)據(jù)有效載荷
        meta := []byte(fmt.Sprintf(`{ "index" : { "_id" : "%d" } }%s`, a.ID, "\n"))

        // 準(zhǔn)備 data 有效載荷:序列化后的 article
        data, err := json.Marshal(a)
        if err != nil {
            log.Fatalf("Cannot encode article %d: %s", a.ID, err)
        }

        // 在 data 載荷中添加一個換行符
        data = append(data, "\n"...)

        // 將載荷添加到 buf 中
        buf.Grow(len(meta) + len(data))
        buf.Write(meta)
        buf.Write(data)

        // 達(dá)到閾值時,使用 buf 中的數(shù)據(jù)(請求體)執(zhí)行 Bulk() 請求
        if i > 0 && i%batch == 0 || i == count-1 {
            fmt.Printf("[%d/%d] ", currBatch, numBatches)

            // 每 batch(本例中是255)個為一組發(fā)送
            res, err = es.Bulk(bytes.NewReader(buf.Bytes()), es.Bulk.WithIndex(indexName))
            if err != nil {
                log.Fatalf("Failur indexing batch %d: %s", currBatch, err)
            }

            // 如果整個請求失敗抛人,打印錯誤并標(biāo)記所有文檔都失敗
            if res.IsError() {
                numErrors += numItems
                if err := json.NewDecoder(res.Body).Decode(&raw); err != nil {
                    log.Fatalf("Failure to parse response body: %s", err)
                } else {
                    log.Printf(" Error: [%d] %s: %s",
                        res.StatusCode,
                        raw["error"].(map[string]interface{})["type"],
                        raw["error"].(map[string]interface{})["reason"],
                    )
                }
            } else {  // 一個成功的響應(yīng)也可能因為一些特殊文檔包含一些錯誤
                if err := json.NewDecoder(res.Body).Decode(&blk); err != nil {
                    log.Fatalf("Failure to parse response body: %s", err)
                } else {
                    for _, d := range blk.Items {
                        // 對任何狀態(tài)碼大于 201 的請求進(jìn)行處理
                        if d.Index.Status > 201 {
                            numErrors++
                            log.Printf("  Error: [%d]: %s: %s: %s: %s",
                                d.Index.Status,
                                d.Index.Error.Type,
                                d.Index.Error.Reason,
                                d.Index.Error.Cause.Type,
                                d.Index.Error.Cause.Reason,
                            )
                        } else {
                            // 如果狀態(tài)碼小于等于 201妆距,對成功的計數(shù)器 numIndexed 加 1
                            numIndexed++
                        }
                    }
                }
            }

            // 關(guān)閉響應(yīng)體,防止達(dá)到 Goroutines 或文件句柄限制
            res.Body.Close()


            // 重置 buf 和 items 計數(shù)器
            buf.Reset()
            numItems = 0
        }
    }

    // 報告結(jié)果:索引成功的文檔的數(shù)量函匕,錯誤的數(shù)量娱据,耗時,索引速度
    fmt.Println()
    log.Println(strings.Repeat("▔", 65))

    dur := time.Since(start)

    if numErrors > 0 {
        log.Fatalf(
            "Indexed [%s] documents with [%s] errors in %s (%s docs/sec)",
            humanize.Comma(int64(numIndexed)),
            humanize.Comma(int64(numErrors)),
            dur.Truncate(time.Millisecond),
            humanize.Comma(int64(1000.0/float64(dur/time.Millisecond)*float64(numIndexed))),
        )
    } else {
        log.Printf(
            "Successfuly indexed [%s] documents in %s (%s docs/sec)",
            humanize.Comma(int64(numIndexed)),
            dur.Truncate(time.Millisecond),
            humanize.Comma(int64(1000.0/float64(dur/time.Millisecond)*float64(numIndexed))),
        )
    }
}

countbatch為可選參數(shù)盅惜,在執(zhí)行時可以自定義:

? go run main.go -count=100000 -batch=25000

Bulk: documents [100,000] batch size [25,000]
▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁
→ Generated 100,000 articles
→ Sending batch
[1/4] [2/4] [3/4] [4/4]
▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔
Successfuly indexed [100,000] documents in 2.79s (35,842 docs/sec)

2 索引器

此例演示使用esutil.BulkIndexer幫助程序來索引文檔中剩。

package main

import (
    "github.com/dustin/go-humanize"
    "github.com/cenkalti/backoff/v4"

    "github.com/elastic/go-elasticsearch/v8"
    "github.com/elastic/go-elasticsearch/v8/esapi"
    "github.com/elastic/go-elasticsearch/v8/esutil"
)

type Article struct {
    ID        int       `json:"id"`
    Title     string    `json:"title"`
    Body      string    `json:"body"`
    Published time.Time `json:"published"`
    Author    Author    `json:"author"`
}

type Author struct {
    FirstName string `json:"first_name"`
    LastName  string `json:"last_name"`
}

var (
    indexName  string
    numWorkers int
    flushBytes int
    numItems   int
)

func init() {
    flag.StringVar(&indexName, "index", "test-bulk-example", "索引名稱")
    flag.IntVar(&numWorkers, "workers", runtime.NumCPU(), "工作進(jìn)程數(shù)量")
    flag.IntVar(&flushBytes, "flush", 5e+6, "以字節(jié)為單位的清除閾值")
    flag.IntVar(&numItems, "count", 10000, "生成的文檔數(shù)量")
    flag.Parse()

    rand.Seed(time.Now().UnixNano())
}

func main() {
    log.SetFlags(0)

    var (
        articles        []*Article
        countSuccessful uint64

        res *esapi.Response
        err error
    )

    log.Printf(
        "\x1b[1mBulkIndexer\x1b[0m: documents [%s] workers [%d] flush [%s]",
        humanize.Comma(int64(numItems)), numWorkers, humanize.Bytes(uint64(flushBytes)))
    log.Println(strings.Repeat("▁", 65))

    // 使用第三方包來實現(xiàn)回退功能
    retryBackoff := backoff.NewExponentialBackOff()

    // 創(chuàng)建客戶端。如果想使用最佳性能抒寂,請考慮使用第三方 http 包结啼,benchmarks 示例中有寫
    es, err := elasticsearch.NewClient(elasticsearch.Config{
        // 429 太多請求
        RetryOnStatus: []int{502, 503, 504, 429},
        // 配置回退函數(shù)
        RetryBackoff: func(attempt int) time.Duration {
            if attempt == 1 {
                retryBackoff.Reset()
            }
            return retryBackoff.NextBackOff()
        },
        // 最多重試 5 次
        MaxRetries: 5,
    })
    if err != nil {
        log.Fatalf("Error creating the client: %s", err)
    }

    // 創(chuàng)建批量索引器。要使用最佳性能屈芜,考慮使用第三方 json 包郊愧,benchmarks 示例中有寫
    bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
        Index:         indexName, // 默認(rèn)索引名
        Client:        es,  // es 客戶端
        NumWorkers:    numWorkers, // 工作進(jìn)程數(shù)量
        FlushBytes:    int(flushBytes), // 清除上限
        FlushInterval: 30 * time.Second, // 定期清除間隔
    })
    if err != nil {
        log.Fatalf("Error creating the indexer: %s", err)
    }

    // 生成文章
    names := []string{"Alice", "John", "Mary"}
    for i := 1; i <= numItems; i++ {
        articles = append(articles, &Article{
            ID:        i,
            Title:     strings.Join([]string{"Title", strconv.Itoa(i)}, " "),
            Body:      "Lorem ipsum dolor sit amet...",
            Published: time.Now().Round(time.Second).UTC().AddDate(0, 0, i),
            Author: Author{
                FirstName: names[rand.Intn(len(names))],
                LastName:  "Smith",
            },
        })
    }
    log.Printf("→ Generated %s articles", humanize.Comma(int64(len(articles))))

    // 重新創(chuàng)建索引
    if res, err = es.Indices.Delete([]string{indexName}, es.Indices.Delete.WithIgnoreUnavailable(true)); err != nil || res.IsError() {
        log.Fatalf("Cannot delete index: %s", err)
    }

    res.Body.Close()

    res, err = es.Indices.Create(indexName)
    if err != nil {
        log.Fatalf("Cannot create index: %s", err)
    }
    if res.IsError() {
        log.Fatalf("Cannot create index: %s", res)
    }
    res.Body.Close()

    start := time.Now().UTC()

    // 循環(huán)收集
    for _, a := range articles {
        // 準(zhǔn)備 data:序列化的 article
        data, err := json.Marshal(a)
        if err != nil {
            log.Fatalf("Cannot encode article %d: %s", a.ID, err)
        }

        // 添加 item 到 BulkIndexer
        err = bi.Add(
            context.Background(),
            esutil.BulkIndexerItem{
                // Action 字段配置要執(zhí)行的操作(索引、創(chuàng)建井佑、刪除属铁、更新)
                Action:     "index",
                // DocumentID 是文檔 ID(可選)
                DocumentID: strconv.Itoa(a.ID),
                // Body 是 有效載荷的 io.Reader
                Body:       bytes.NewReader(data),
                // OnSuccess 在每一個成功的操作后調(diào)用
                OnSuccess: func(c context.Context, bii esutil.BulkIndexerItem, biri esutil.BulkIndexerResponseItem) {
                    atomic.AddUint64(&countSuccessful, 1)
                },
                // OnFailure 在每一個失敗的操作后調(diào)用
                OnFailure: func(c context.Context, bii esutil.BulkIndexerItem, biri esutil.BulkIndexerResponseItem, e error) {
                    if err != nil {
                        log.Printf("ERROR: %s", err)
                    } else {
                        log.Printf("ERROR: %s: %s", biri.Error.Type, biri.Error.Reason)
                    }
                },
            },
        )
        if err != nil {
            log.Fatalf("Unexpected error: %s", err)
        }
    }

    // 關(guān)閉索引器
    if err := bi.Close(context.Background()); err != nil {
        log.Fatalf("Unexpected error: %s", err)
    }

    biStats := bi.Stats()

    // 報告結(jié)果:索引成功的文檔的數(shù)量,錯誤的數(shù)量躬翁,耗時焦蘑,索引速度
    log.Println(strings.Repeat("▔", 65))

    dur := time.Since(start)

    if biStats.NumFailed > 0 {
        log.Fatalf(
            "Indexed [%s] documents with [%s] errors in %s (%s docs/sec)",
            humanize.Comma(int64(biStats.NumFlushed)),
            humanize.Comma(int64(biStats.NumFailed)),
            dur.Truncate(time.Millisecond),
            humanize.Comma(int64(1000.0/float64(dur/time.Millisecond)*float64(biStats.NumFlushed))),
        )
    } else {
        log.Printf(
            "Sucessfuly indexed [%s] documents in %s (%s docs/sec)",
            humanize.Comma(int64(biStats.NumFlushed)),
            dur.Truncate(time.Millisecond),
            humanize.Comma(int64(1000.0/float64(dur/time.Millisecond)*float64(biStats.NumFlushed))),
        )
    }
}

四個可選參數(shù),見init()函數(shù):

 ?  bulk go run indexer.go --workers=8 --count=100000 --flush=1000000
 
BulkIndexer: documents [100,000] workers [8] flush [1.0 MB]
▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁
→ Generated 100,000 articles
▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔
Sucessfuly indexed [100,000] documents in 1.584s (63,131 docs/sec)

五盒发、編碼

本示例中演示了如何使用 helper 方法和第三方 json 庫例嘱。

1 第三方 json 庫

1.1 tidwall/gjson

github.com/tidwall/gjson庫允許在不將有效載荷轉(zhuǎn)換為數(shù)據(jù)結(jié)構(gòu)的前提下輕松訪問屬性。

package main

import (
    "github.com/elastic/go-elasticsearch/v8"
    "github.com/fatih/color"
    "github.com/tidwall/gjson"
)

var (
    faint = color.New(color.Faint)
    bold  = color.New(color.Bold)
)

func init() {
    log.SetFlags(0)
}

func main() {
    es, err := elasticsearch.NewDefaultClient()
    if err != nil {
        log.Fatalf("Error creating client: %s", err)
    }

    res, err := es.Cluster.Stats(es.Cluster.Stats.WithHuman())
    if err != nil {
        log.Fatalf("Error getting response: %s", err)
    }
    defer res.Body.Close()

    json := read(res.Body)

    fmt.Println(strings.Repeat("-", 50))
    faint.Print("cluster ")
    // 獲取集群名
    bold.Print(gjson.Get(json, "cluster_name"))

    faint.Print(" status=")
    // 獲取集群健康狀態(tài)
    status := gjson.Get(json, "status")
    switch status.Str {
    case "green":
        bold.Add(color.FgHiGreen).Print(status)
    case "yellow":
        bold.Add(color.FgHiYellow).Print(status)
    case "red":
        bold.Add(color.FgHiRed).Print(status)
    default:
        bold.Add(color.FgHiRed, color.Underline).Print(status)
    }
    fmt.Println("\n" + strings.Repeat("-", 50))

    stats := []string{
        "indices.count",
        "indices.docs.count",
        "indices.store.size",
        "nodes.count.total",
        "nodes.os.mem.used_percent",
        "nodes.process.cpu.percent",
        "nodes.jvm.versions.#.version",
        "nodes.jvm.mem.heap_used",
        "nodes.jvm.mem.heap_max",
        "nodes.fs.free",
    }

    var maxWidth int
    for _, item := range stats {
        if len(item) > maxWidth {
            maxWidth = len(item)
        }
    }

    for _, item := range stats {
        pad := maxWidth - len(item)
        fmt.Print(strings.Repeat(" ", pad))
        faint.Printf("%s |", item)
        // 從 json 動態(tài)獲取狀態(tài)
        fmt.Printf(" %s\n", gjson.Get(json, item))
    }
    fmt.Println()
}

func read(r io.Reader) string {
    var b bytes.Buffer
    b.ReadFrom(r)
    return b.String()
}
go run main.go
截屏2021-04-28 20.07.08

1.2 mailru/easyjson

mailru/easyjson可以用提供的結(jié)構(gòu)體生成編碼和解碼的代碼宁舰。

示例項目結(jié)構(gòu)不適合在此處展示拼卵,故另寫一篇文章,請參閱:Elasticsearch 的 easyjson 示例

1.3 ES 中的 JSONReader

esutil.JSONReader()方法將 struct蛮艰、map 或任何其他可序列化對象轉(zhuǎn)換為封裝在 reader 中的 JSON腋腮,然后將其傳遞給WithBody()方法:

type Document struct{ Title string }
doc := Document{Title: "Test"}
es.Search(es.Search.WithBody(esutil.NewJSONReader(&doc)))

完整示例:

package main

import (
    "github.com/elastic/go-elasticsearch/v8"
    "github.com/elastic/go-elasticsearch/v8/esapi"
    "github.com/elastic/go-elasticsearch/v8/esutil"
)

func init() {
    log.SetFlags(0)
}

func main() {
    var (
        res *esapi.Response
        err error
    )

    es, err := elasticsearch.NewDefaultClient()
    if err != nil {
        log.Fatalf("Error creating the client: %s", err)
    }

    doc := struct {
        Title string `json:"title"`
    }{Title: "Test"}

    res, err = es.Index("test", esutil.NewJSONReader(&doc), es.Index.WithRefresh("true"))
    if err != nil {
        log.Fatalf("Error getting response: %s", err)
    }

    log.Println(res)

    query := map[string]interface{}{
        "query": map[string]interface{}{
            "match": map[string]interface{}{
                "title": "test",
            },
        },
    }

    res, err = es.Search(
        es.Search.WithIndex("test"),
        es.Search.WithBody(esutil.NewJSONReader(&query)),
        es.Search.WithPretty(),
    )
    if err != nil {
        log.Fatalf("Error getting response: %s", err)
    }

    log.Println(res)
}

運行結(jié)果:

[201 Created] {"_index":"test","_type":"_doc","_id":"4l7aG3kBuWpKCVVn78cc","_version":1,"result":"created","forced_refresh":true,"_shards":{"total":2,"successful":1,"failed":0},"_seq_no":28,"_primary_term":4}
[200 OK] {
  "took" : 18,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 2.3671236,
    "hits" : [
      {
        "_index" : "test",
        "_type" : "_doc",
        "_id" : "4l7aG3kBuWpKCVVn78cc",
        "_score" : 2.3671236,
        "_source" : {
          "title" : "Test"
        }
      }
    ]
  }
}

1.4 幾個 json 庫的基準(zhǔn)測試結(jié)果

BenchmarkEncode/Article_-_json-8              671839          1677 ns/op         768 B/op          5 allocs/op
BenchmarkEncode/Article_-_JSONReader-8        712545          1685 ns/op         824 B/op          7 allocs/op
BenchmarkEncode/Article_-_easyjson-8         1503753           802.0 ns/op       760 B/op          6 allocs/op
BenchmarkEncode/map_-_json-8                  934605          1279 ns/op         672 B/op         18 allocs/op
BenchmarkEncode/map_-_JSONReader-8            824247          1421 ns/op         728 B/op         20 allocs/op
BenchmarkDecode/Search_-_json-8                46322         25893 ns/op        9258 B/op         75 allocs/op
BenchmarkDecode/Search_-_easyjson-8           103856         11344 ns/op       12635 B/op         70 allocs/op
BenchmarkDecode/Cluster_-_json_-_map-8         23635         50752 ns/op       31603 B/op        385 allocs/op
BenchmarkDecode/Cluster_-_json_-_stc-8         38974         30788 ns/op       15160 B/op         20 allocs/op
BenchmarkDecode/Cluster_-_gjson-8             909138          1354 ns/op         256 B/op          3 allocs/op

mailru/easyjsontidwall/gjson 在不同場景下都比標(biāo)準(zhǔn)庫有更好的性能。

六、第三方 http 庫

本例演示如何使用fasthttp替換默認(rèn)的net/http低葫,并測試二者的性能详羡。

1 示例代碼

package http

import (
    "io/ioutil"
    "net/http"
    "strings"

    "github.com/valyala/fasthttp"
)

// Transport 用 fasthttp 實現(xiàn) es 接口
type Transport struct{}

// RoundTrip 發(fā)送請求仍律,返回響應(yīng)或錯誤
func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) {
    freq := fasthttp.AcquireRequest()
    defer fasthttp.ReleaseRequest(freq)

    fres := fasthttp.AcquireResponse()
    defer fasthttp.ReleaseResponse(fres)

    t.copyRequest(freq, req)

    err := fasthttp.Do(freq, fres)
    if err != nil {
        return nil, err
    }

    res := &http.Response{Header: make(http.Header)}
    t.copyResponse(res, fres)

    return res, nil
}

// copyRequest 將 http 請求轉(zhuǎn)換為 fasthttp 請求
func (t *Transport) copyRequest(dst *fasthttp.Request, src *http.Request) *fasthttp.Request {
    if src.Method == http.MethodGet && src.Body != nil {
        src.Method = http.MethodPost
    }

    dst.SetHost(src.Host)
    dst.SetRequestURI(src.URL.String())

    dst.Header.SetRequestURI(src.URL.String())
    dst.Header.SetMethod(src.Method)

    for k, vs := range src.Header {
        for _, v := range vs {
            dst.Header.Set(k, v)
        }
    }

    if src.Body != nil {
        dst.SetBodyStream(src.Body, -1)
    }

    return dst
}

// copyResponse 將 fasthttp 響應(yīng)轉(zhuǎn)換為 http 響應(yīng)
func (t *Transport) copyResponse(dst *http.Response, src *fasthttp.Response) *http.Response {
    dst.StatusCode = src.StatusCode()

    src.Header.VisitAll(func(key, value []byte) {
        dst.Header.Set(string(key), string(value))
    })

    // 在響應(yīng)被釋放回響應(yīng)池(fasthttp.ReleaseResponse)后嘿悬,將 src.Body() 轉(zhuǎn)換為字符串 Reader
    dst.Body = ioutil.NopCloser(strings.NewReader(string(src.Body())))

    return dst
}

2 基準(zhǔn)測試

測試代碼:

package http_test

import (
    "elasticsearch/fasthttp/http"
    "testing"

    "github.com/elastic/go-elasticsearch/v8"
)

func BenchmarkHTTPClient(b *testing.B) {
    b.ReportAllocs()

    client, err := elasticsearch.NewDefaultClient()
    if err != nil {
        b.Fatalf("ERROR: %s", err)
    }

    b.Run("Info()", func(b *testing.B) {
        b.ResetTimer()

        for i := 0; i < b.N; i++ {
            if res, err := client.Info(); err != nil {
                b.Errorf("Unexpected error when getting a response: %s", err)
            } else {
                res.Body.Close()
            }
        }
    })
}

func BenchmarkFastHTTPClient(b *testing.B) {
    b.ReportAllocs()

    client, err := elasticsearch.NewClient(
        elasticsearch.Config{Transport: &http.Transport{}},
    )
    if err != nil {
        b.Fatalf("ERROR: %s", err)
    }

    b.Run("Info()", func(b *testing.B) {
        b.ResetTimer()

        for i := 0; i < b.N; i++ {
            if res, err := client.Info(); err != nil {
                b.Errorf("Unexpected error when getting a response: %s", err)
            } else {
                res.Body.Close()
            }
        }
    })
}

結(jié)果:

...
BenchmarkHTTPClient/Info()-8                6067       2438072 ns/op       15908 B/op        120 allocs/op
...
BenchmarkFastHTTPClient/Info()-8           14690        811282 ns/op        2325 B/op         27 allocs/op

http 標(biāo)準(zhǔn)庫的性能在慢慢提高,但是仍然與 fasthttp 有著不小的差距水泉。

七善涨、檢測儀

此示例演示了如何檢測 es 客戶端。

1 OpenCensus

使用 ochttp.Transport自動檢測客戶端調(diào)用草则,并將有關(guān)信息打印到終端钢拧。

package main

import (
    "github.com/elastic/go-elasticsearch/v8"
    "github.com/fatih/color"

    "go.opencensus.io/plugin/ochttp"
    "go.opencensus.io/stats/view"
    "go.opencensus.io/tag"
    "go.opencensus.io/trace"
    "golang.org/x/crypto/ssh/terminal"
)

const count = 100

var (
    tWidth, _, _ = terminal.GetSize(int(os.Stdout.Fd()))

    faint   = color.New(color.Faint)
    bold    = color.New(color.Bold)
    boldRed = color.New(color.FgRed).Add(color.Bold)

    tagMethod, _ = tag.NewKey("method")
)

func init() {
    if tWidth < 0 {
        tWidth = 0
    }
}

// ConsoleExporter 將狀態(tài)和追蹤軌跡打印到終端
type ConsoleExporter struct{}

// ExportView 打印狀態(tài)
func (e *ConsoleExporter) ExportView(vd *view.Data) {
    fmt.Println(strings.Repeat("─", tWidth))
    for _, row := range vd.Rows {
        faint.Print("█ ")
        fmt.Printf("%-17s ", strings.TrimPrefix(vd.View.Name, "opencensus.io/http/client/"))

        switch v := row.Data.(type) {
        case *view.DistributionData:
            bold.Printf("min=%-6.1f max=%-6.1f mean=%-6.1f", v.Min, v.Max, v.Mean)
        case *view.CountData:
            bold.Printf("count=%-3v", v.Value)
        case *view.SumData:
            bold.Printf("sum=%-3v", v.Value)
        case *view.LastValueData:
            bold.Printf("last=%-3v", v.Value)
        }
        faint.Print(" │ ")

        for _, tag := range row.Tags {
            faint.Printf("%-25s ", fmt.Sprintf("%v=%v", tag.Key.Name(), tag.Value))
        }
        fmt.Println()
    }
}

// ExportSpan 打印追蹤軌跡
func (e *ConsoleExporter) ExportSpan(sd *trace.SpanData) {
    var c *color.Color
    if sd.Status.Code > 0 {
        c = color.New(color.FgRed)
    } else {
        c = color.New(color.FgGreen)
    }

    fmt.Println(strings.Repeat("─", tWidth))

    fmt.Printf(
        "? %s %s %s\n",
        c.Sprint(sd.Status.Message),
        bold.Sprint(sd.Name),
        sd.EndTime.Sub(sd.StartTime).Round(time.Millisecond))

    faint.Printf("? %x > %x\n", sd.SpanContext.TraceID[:], sd.SpanContext.SpanID[:])

    if len(sd.Attributes) > 0 {
        faint.Print("? ")
        var keys []string
        for k := range sd.Attributes {
            keys = append(keys, k)
        }
        sort.Strings(keys)
        for i, k := range keys {
            faint.Printf("%s=%v", k, sd.Attributes[k])
            if i < len(keys)-1 {
                faint.Printf(" │ ")
            }
        }
    }
    fmt.Println()
}

func main() {
    log.SetFlags(0)
    start := time.Now()

    // Create new elasticsearch client ...
    //
    es, err := elasticsearch.NewClient(
        elasticsearch.Config{
            // >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
            // ... 使用"ochttp" 封裝檢測儀
            Transport: &ochttp.Transport{},
            // <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
        })
    if err != nil {
        log.Fatalf("ERROR: %s", err)
    }

    // 創(chuàng)建 done 管道
    //
    done := make(chan os.Signal)
    signal.Notify(done, os.Interrupt)

    // 創(chuàng)建刻點
    //
    tickers := struct {
        Info   *time.Ticker
        Index  *time.Ticker
        Health *time.Ticker
        Search *time.Ticker
    }{
        Info:   time.NewTicker(time.Second),
        Index:  time.NewTicker(500 * time.Millisecond),
        Health: time.NewTicker(5 * time.Second),
        Search: time.NewTicker(10 * time.Second),
    }
    defer tickers.Info.Stop()
    defer tickers.Index.Stop()
    defer tickers.Health.Stop()
    defer tickers.Search.Stop()

    // 使用 ochttp 插件注冊視圖
    //
    if err := view.Register(
        ochttp.ClientRoundtripLatencyDistribution,
        ochttp.ClientCompletedCount,
    ); err != nil {
        log.Fatalf("ERROR: %s", err)
    }

    // 一段時間向 STDOUT 報告一次視圖
    //
    view.SetReportingPeriod(5 * time.Second)
    view.RegisterExporter(&ConsoleExporter{})

    // 向 STDOUT 報告一部分軌跡
    //
    trace.ApplyConfig(trace.Config{DefaultSampler: trace.ProbabilitySampler(0.5)})
    trace.RegisterExporter(&ConsoleExporter{})

    // 初始化上下文
    //
    ctx, _ := tag.New(context.Background(), tag.Upsert(tagMethod, "main"))

    // 調(diào)用 api
    //
    for {
        select {
        case <-done:
            fmt.Print("\n")
            fmt.Println(strings.Repeat("━", tWidth))
            faint.Printf("Finished in %s\n\n", time.Now().Sub(start).Truncate(time.Second))
            return

        // -> Info
        //
        case <-tickers.Info.C:
            res, err := es.Info(es.Info.WithContext(ctx))
            if err != nil {
                boldRed.Printf("Error getting response: %s\n", err)
            } else {
                res.Body.Close()
            }

        // -> Index
        //
        case t := <-tickers.Index.C:
            // Artificially fail some requests...
            var body io.Reader
            if t.Second()%4 == 0 {
                body = strings.NewReader(``)
            } else {
                body = strings.NewReader(`{"timestamp":"` + t.Format(time.RFC3339) + `"}`)
            }

            res, err := es.Index("test", body, es.Index.WithContext(ctx))
            if err != nil {
                boldRed.Printf("Error getting response: %s\n", err)
            } else {
                res.Body.Close()
            }

        // -> Health
        //
        case <-tickers.Health.C:
            res, err := es.Cluster.Health(
                es.Cluster.Health.WithLevel("indices"),
                es.Cluster.Health.WithContext(ctx),
            )
            if err != nil {
                boldRed.Printf("Error getting response: %s\n", err)
            } else {
                res.Body.Close()
            }

        // -> Search
        //
        case <-tickers.Search.C:
            res, err := es.Search(
                es.Search.WithIndex("test"),
                es.Search.WithSort("timestamp:desc"),
                es.Search.WithSize(1),
                es.Search.WithContext(ctx),
            )
            if err != nil {
                boldRed.Printf("Error getting response: %s\n", err)
            } else {
                res.Body.Close()
            }
        }
    }
}

2 Elastic APM

使用Go agent for Elastic APM檢測客戶端:

  • 配置多種類型的事務(wù)
  • 在每個事務(wù)中創(chuàng)建自定義跨度
  • 報告錯誤

使用 docker 示例。

2.1 docker-compose 配置文件

文件名:elasticstack.yml炕横。

version: "3.2"

services:
  # --- Application -----------------------------------------------------------
  application:
    container_name: application
    build: .
    networks: ["elasticstack"]
    depends_on:
      - elasticsearch
      - kibana
      - apm-server

  # --- Elasticsearch ---------------------------------------------------------
  elasticsearch:
    image: elasticsearch:7.12.1
    container_name: elasticsearch
    volumes:
      - es_data:/usr/share/elasticsearch/data:delegated
    networks: ["elasticstack"]
    environment:
      - "cluster.name=go-elasticsearch-instrumentation"
      - "cluster.routing.allocation.disk.threshold_enabled=false"
      - "discovery.type=single-node"
      - "bootstrap.memory_lock=true"
      - "xpack.security.enabled=false"
      - "ES_JAVA_OPTS=-Xms1g -Xmx1g"
    expose:
      - "9200"
    ulimits:
      memlock: -1
      nproc: 65535
      nofile: 65535
    healthcheck:
      test: curl --max-time 60 --retry 60 --retry-delay 1 --retry-connrefused --show-error --silent http://localhost:9200

  # --- Kibana ----------------------------------------------------------------
  kibana:
    image: kibana:7.12.1
    container_name: kibana
    networks: ["elasticstack"]
    environment:
      - "ELASTICSEARCH_URL=http://elasticsearch:9200"
    ports:
      - "5601:5601"
    depends_on: ["elasticsearch"]
    healthcheck:
      test: curl --max-time 60 --retry 60 --retry-delay 1 --retry-connrefused --show-error --silent http://localhost:5601

  # --- APM Server ------------------------------------------------------------
  apm-server:
    image: elastic/apm-server:7.12.1
    container_name: apm_server
    networks: ["elasticstack"]
    command: >
      ./apm-server run -e \
        -E output.elasticsearch.hosts=http://elasticsearch:9200 \
        -E setup.kibana.host=http://kibana:5601
    expose:
      - "8200"
    depends_on: ["elasticsearch", "kibana"]
    healthcheck:
      test: curl --max-time 60 --retry 60 --retry-delay 1 --retry-connrefused --show-error --silent http://localhost:8200/healthcheck

networks:
  elasticstack:

volumes:
  es_data:

2.2 Dockerfile

FROM golang:1.16.3

RUN echo 'deb http://mirrors.aliyun.com/debian/ buster main non-free contrib\n\
deb http://mirrors.aliyun.com/debian-security buster/updates main\n\
deb http://mirrors.aliyun.com/debian/ buster-updates main non-free contrib\n\
deb http://mirrors.aliyun.com/debian/ buster-backports main non-free contrib\n'\
> /etc/apt/sources.list
RUN apt-get update && apt-get install -y gcc g++ ca-certificates make curl git jq

WORKDIR /go-elasticsearch-demo-instrumentation

RUN go env -w GO111MODULE=on && go env -w GOPROXY=https://goproxy.cn,direct

COPY go.mod .
RUN go mod download

ENV TERM xterm-256color

ENV ELASTICSEARCH_URL=${ELASTICSEARCH_URL:-http://elasticsearch:9200}

ENV ELASTIC_APM_SERVER_URL=${ELASTIC_APM_SERVER_URL:-http://apm_server:8200}
ENV ELASTIC_APM_SERVICE_NAME=go-elasticsearch-demo-instrumentation
ENV ELASTIC_APM_METRICS_INTERVAL=5s
ENV ELASTIC_APM_LOG_FILE=stderr
ENV ELASTIC_APM_LOG_LEVEL=debug

COPY apmelasticsearch.go opencensus.go ./

CMD go run apmelasticsearch.go

2.3 運行

elasticstack.yml 和 Dockfile 放在同一目錄源内。

運行:

docker-compose --file elasticstack.yml up --build

刪除示例 docker 文件:

docker-compose --file elasticstack.yml down --remove-orphans --volumes

八、擴(kuò)展

此例演示如何擴(kuò)展客戶端份殿,以便調(diào)用自定義 API膜钓。

main.go示例中定義了嵌入到常規(guī)客戶端的自定義類型,添加一個實現(xiàn)了Example()方法的Custom命名空間卿嘲。

package main

import (
    "github.com/elastic/go-elasticsearch/v8"
    "github.com/elastic/go-elasticsearch/v8/esapi"
    "github.com/elastic/go-elasticsearch/v8/estransport"
)

const port = "9209"

// ExtendedClient 包括常規(guī) api 和自定義 api
type ExtendedClient struct {
    *elasticsearch.Client
    Custom *ExtendedAPI
}

// ExtendedAPI 自定義 api
type ExtendedAPI struct {
    *elasticsearch.Client
}

// Example 調(diào)用自定義 restful api颂斜,"GET /_cat/example"
func (e *ExtendedAPI) Example() (*esapi.Response, error) {
    req, _ := http.NewRequest("GET", "/_cat/example", nil)

    res, err := e.Perform(req)
    if err != nil {
        return nil, err
    }

    return &esapi.Response{StatusCode: res.StatusCode, Body: res.Body, Header: res.Header}, nil
}

func main() {
    log.SetFlags(0)

    started := make(chan bool)

    // 啟動代理服務(wù)
    go startServer(started)

    ec, err := elasticsearch.NewClient(elasticsearch.Config{
        Addresses: []string{"http://localhost:" + port},
        Logger:    &estransport.ColorLogger{Output: os.Stdout, EnableRequestBody: true, EnableResponseBody: true},
    })
    if err != nil {
        log.Fatalf("Error creating the client: %s", err)
    }

    es := ExtendedClient{
        Client: ec,
        Custom: &ExtendedAPI{ec},
    }
    <-started

    // 調(diào)用常規(guī) api
    es.Cat.Health()

    // 調(diào)用自定義 api
    es.Custom.Example()

}

func startServer(started chan<- bool) {
    proxy := httputil.NewSingleHostReverseProxy(&url.URL{Scheme: "http", Host: "localhost:9200"})

    // 在"GET /_cat/example"上以自定義內(nèi)容響應(yīng),將其他請求代理到 es
    //
    http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
        if r.Method == "GET" && r.URL.Path == "/_cat/example" {
            io.WriteString(w, "Hello from Cat Example action")
            return
        }
        proxy.ServeHTTP(w, r)
    })

    ln, err := net.Listen("tcp", "localhost:"+port)
    if err != nil {
        log.Fatalf("Unable to start server: %s", err)
    }

    go http.Serve(ln, nil)
    started <- true
}

結(jié)果:

截屏2021-04-30 18.45.28

九拾枣、安全

本例演示如何通過用自定義證書使用 TLS (傳輸層安全) 來加密和驗證與 ES 集群的通信沃疮。

1 為集群創(chuàng)建證書

創(chuàng)建證書的命令如下:

OUTPUT="/certificates/bundle.zip"
if [[ -f $$OUTPUT ]]; then
  echo "Certificates already present in [.$$OUTPUT]"; exit 1;
else
  yum install -y -q -e 0 unzip tree;
  bin/elasticsearch-certutil cert \
    --pem \
    --days 365 \
    --keep-ca-key \
    --in config/certificates/certificates-config.yml \
    --out $$OUTPUT;
  unzip -q $$OUTPUT -d /certificates;
  chown -R 1000:0 /certificates; echo;
  tree /certificates;
fi;

命令中用到的certificates-config.yml內(nèi)容如下:

instances:
  - name: elasticsearch
    ip:  [0.0.0.0, 127.0.0.1]
    dns: ["localhost", "example_elasticsearch_1", "example_elasticsearch_2", "example_elasticsearch_3"]

既然是示例,創(chuàng)建一個即用即毀的環(huán)境是很有必要的梅肤,所以使用一個 docker 容器運行上面的命令是最好的選擇司蔬。

docker-compose 文件:

version: "3.7"

services:
  create_certificates:
    image: elasticsearch:7.12.1
    container_name: certificates_generator
    user: root
    working_dir: /usr/share/elasticsearch
    command: >
      bash -c '
        OUTPUT="/certificates/bundle.zip"
        if [[ -f $$OUTPUT ]]; then
          echo "Certificates already present in [.$$OUTPUT]"; exit 1;
        else
          yum install -y -q -e 0 unzip tree;
          bin/elasticsearch-certutil cert \
            --pem \
            --days 365 \
            --keep-ca-key \
            --in config/certificates/certificates-config.yml \
            --out $$OUTPUT;
          unzip -q $$OUTPUT -d /certificates;
          chown -R 1000:0 /certificates; echo;
          tree /certificates;
        fi;
      '
    volumes:
      - ./certificates:/certificates
      - ./certificates-config.yml:/usr/share/elasticsearch/config/certificates/certificates-config.yml

運行 docker 容器:

docker-compose --file certificates-create.yml run --rm create_certificates

運行完成后,當(dāng)前目錄中就會生成一個包含證書的certificates文件夾姨蝴。

現(xiàn)在有了證書葱她,下面就可以創(chuàng)建一個開啟安全認(rèn)證的集群。

docker-compose 文件:

version: "3.7"

services:
  elasticsearch:
    image: elasticsearch:${ELASTIC_VERSION}
    volumes:
      - es-data:/usr/share/elasticsearch/data
      - ./certificates:/usr/share/elasticsearch/config/certificates/
    networks:
      - elasticstack
    ports:
      - 9200:9200
    environment:
      - node.name=example_elasticsearch_1
      - cluster.name=golang-example-security
      - cluster.initial_master_nodes=example_elasticsearch_1
      - discovery.seed_hosts=example_elasticsearch_1
      - bootstrap.memory_lock=true
      - network.host=example_elasticsearch_1,_local_
      - network.publish_host=example_elasticsearch_1
      - ES_JAVA_OPTS=-Xms1G -Xmx1G -Des.transport.cname_in_publish_address=true
      # Security & TLS
      - ELASTIC_PASSWORD=${ELASTIC_PASSWORD}
      - xpack.security.enabled=true
      - xpack.security.http.ssl.enabled=true
      - xpack.security.http.ssl.key=/usr/share/elasticsearch/config/certificates/elasticsearch/elasticsearch.key
      - xpack.security.http.ssl.certificate=/usr/share/elasticsearch/config/certificates/elasticsearch/elasticsearch.crt
      - xpack.security.http.ssl.certificate_authorities=/usr/share/elasticsearch/config/certificates/ca/ca.crt
      - xpack.security.transport.ssl.enabled=true
      - xpack.security.transport.ssl.verification_mode=certificate
      - xpack.security.transport.ssl.key=/usr/share/elasticsearch/config/certificates/elasticsearch/elasticsearch.key
      - xpack.security.transport.ssl.certificate=/usr/share/elasticsearch/config/certificates/elasticsearch/elasticsearch.crt
      - xpack.security.transport.ssl.certificate_authorities=/usr/share/elasticsearch/config/certificates/ca/ca.crt
    ulimits: { nofile: { soft: 262144, hard: 262144 }, memlock: -1 }
    healthcheck:
      test: curl --cacert /usr/share/elasticsearch/config/certificates/ca/ca.crt --max-time 120 --retry 120 --retry-delay 1 --show-error --silent https://elastic:${ELASTIC_PASSWORD}@localhost:9200

networks:
  elasticstack: { labels: { elasticstack.description: "Network for the Elastic Stack" }}

volumes:
  es-data: { labels: { elasticstack.description: "Elasticsearch data" }}

運行集群:

docker-compose --file elasticsearch-cluster.yml up --remove-orphans --detach

用證書訪問配合賬號密碼訪問:

curl --cacert certificates/ca/ca.crt https://elastic:elastic@localhost:9200

會得到正確響應(yīng)似扔。

2 使用 Go 客戶端安全訪問

2.1 使用客戶端配置中的證書字段

先從文件中讀取證書內(nèi)容吨些,然后將證書放到客戶端配置中:

// --> 從文件中讀取證書
cert, _ := ioutil.ReadFile(*cacert)

es, _ := elasticsearch.NewClient(
    elasticsearch.Config{
        // ...

        // --> 將證書放到配置中
        CACert: cert,
    })

2.2 根據(jù)證書創(chuàng)建自定義傳輸

cert, _ := ioutil.ReadFile(*cacert)

// 復(fù)制默認(rèn)傳輸
tp := http.DefaultTransport.(*http.Transport).Clone()

// 初始化一個根證書頒發(fā)機(jī)構(gòu)
tp.TLSClientConfig.RootCAs, _ = x509.SystemCertPool()

// 添加自定義證書頒發(fā)機(jī)構(gòu)
tp.TLSClientConfig.RootCAs.AppendCertsFromPEM(cert)

es, _ := elasticsearch.NewClient(
  elasticsearch.Config{
    Addresses: []string{"https://localhost:9200"},
    Username:  "elastic",
    Password:  *password,

    // --> 將自定義傳輸添加到客戶端配置中
    //
    Transport: tp,
  },
)

十、示例應(yīng)用

爬取掘金熱門推薦的頁面的信息保存到 es 中炒辉,并進(jìn)行查詢豪墅。

示例項目地址:thep0y/juejin-hot-es-example

查詢時使用命令行進(jìn)行,示例項目的命令如下:

juejin allows you to index and search hot-recommended article's titles

Usage:
  juejin [command]

Available Commands:
  help        Help about any command
  index       Index juejin hot-recommended articles into Elasticsearch
  search      Search juejin hot recommended articles

Flags:
  -h, --help           help for juejin
  -i, --index string   Index name (default "juejin")

Use "juejin [command] --help" for more information about a command.

可選參數(shù)為indexsearch黔寇。

其中index也有可選命令:

      --pages int   The count of pages you want to crawl (default 5)
      --setup       Create Elasticsearch index

本項目使用的是本地 es 偶器,推薦用 docker 創(chuàng)建,es 中需要安裝 ik 中文分詞插件

1 創(chuàng)建索引

go run main.go index --setup

默認(rèn)會根據(jù)項目中指定的 mapping 創(chuàng)建索引屏轰,并爬取存儲 5 頁颊郎、共 100 條信息。

結(jié)果如下所示:

8:10PM INF Creating index with mapping
8:10PM INF Starting the crawl with 0 workers at 0 offset
8:10PM INF Stored doc Article ID=6957974706943164447 title="算法篇01霎苗、排序算法"
8:10PM INF Stored doc Article ID=6953868764362309639 title="如何處理瀏覽器的斷網(wǎng)情況姆吭?"
...
8:10PM INF Skipping existing doc ID=6957726578692341791
8:10PM INF Skipping existing doc ID=6957925118429364255
8:10PM INF Skipping existing doc ID=6953868764362309639
8:10PM INF Skipping existing doc ID=6957981912669519903
8:10PM INF Skipping existing doc ID=6953059119561441287
8:10PM INF Skipping existing doc ID=6955336007839383588
...
8:10PM INF Stored doc Article ID=6957930535574306847 title="Node系列-阻塞和非阻塞的理解"
8:10PM INF Stored doc Article ID=6956602138201948196 title="《前端領(lǐng)域的轉(zhuǎn)譯打包工具鏈》上篇"
8:10PM INF Stored doc Article ID=6957982556885090312 title="JS篇:事件流"

終端結(jié)果截圖:

截屏2021-05-03 20.14.12

因為每頁有 20 條,共爬 5 頁唁盏,所以理論上應(yīng)存儲 100 條信息内狸,但其中可能會存在幾條重復(fù)信息,所以最后保存時可能會小于 100 條厘擂。

2 爬取 10 頁

go run main.go index --pages 10

運行這條命令時昆淡,不會再創(chuàng)建索引,而是直接開始爬蟲刽严,因為只是示例項目昂灵,所以沒有增加起始頁和最終頁的選擇,只提供最終頁碼作為可選參數(shù)舞萄。

運行結(jié)果與上小節(jié)基本相同:

截屏2021-05-03 20.17.38

3 查詢

查詢時眨补,使用的是詞組查詢,中文更適合使用詞組查詢鹏氧,不然每個查詢詞被拆分成單字查詢渤涌,結(jié)果一般不是我們想要的。

go run main.go search 前端

查詢到的結(jié)果中會將查詢詞高亮顯示:

截屏2021-05-03 20.22.39
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末把还,一起剝皮案震驚了整個濱河市实蓬,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌吊履,老刑警劉巖安皱,帶你破解...
    沈念sama閱讀 211,194評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異艇炎,居然都是意外死亡酌伊,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,058評論 2 385
  • 文/潘曉璐 我一進(jìn)店門缀踪,熙熙樓的掌柜王于貴愁眉苦臉地迎上來居砖,“玉大人,你說我怎么就攤上這事驴娃∽嗪颍” “怎么了?”我有些...
    開封第一講書人閱讀 156,780評論 0 346
  • 文/不壞的土叔 我叫張陵唇敞,是天一觀的道長蔗草。 經(jīng)常有香客問我咒彤,道長,這世上最難降的妖魔是什么咒精? 我笑而不...
    開封第一講書人閱讀 56,388評論 1 283
  • 正文 為了忘掉前任镶柱,我火速辦了婚禮,結(jié)果婚禮上模叙,老公的妹妹穿的比我還像新娘歇拆。我一直安慰自己,他們只是感情好向楼,可當(dāng)我...
    茶點故事閱讀 65,430評論 5 384
  • 文/花漫 我一把揭開白布查吊。 她就那樣靜靜地躺著谐区,像睡著了一般湖蜕。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上宋列,一...
    開封第一講書人閱讀 49,764評論 1 290
  • 那天昭抒,我揣著相機(jī)與錄音,去河邊找鬼炼杖。 笑死灭返,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的坤邪。 我是一名探鬼主播熙含,決...
    沈念sama閱讀 38,907評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼挎峦,長吁一口氣:“原來是場噩夢啊……” “哼览妖!你這毒婦竟也來了万哪?” 一聲冷哼從身側(cè)響起蟀瞧,我...
    開封第一講書人閱讀 37,679評論 0 266
  • 序言:老撾萬榮一對情侶失蹤凑队,失蹤者是張志新(化名)和其女友劉穎近弟,沒想到半個月后壳鹤,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體京髓,經(jīng)...
    沈念sama閱讀 44,122評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡盟劫,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,459評論 2 325
  • 正文 我和宋清朗相戀三年夜牡,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片侣签。...
    茶點故事閱讀 38,605評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡塘装,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出影所,到底是詐尸還是另有隱情蹦肴,我是刑警寧澤,帶...
    沈念sama閱讀 34,270評論 4 329
  • 正文 年R本政府宣布型檀,位于F島的核電站冗尤,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜裂七,卻給世界環(huán)境...
    茶點故事閱讀 39,867評論 3 312
  • 文/蒙蒙 一皆看、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧背零,春花似錦腰吟、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,734評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至侦镇,卻和暖如春灵疮,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背壳繁。 一陣腳步聲響...
    開封第一講書人閱讀 31,961評論 1 265
  • 我被黑心中介騙來泰國打工震捣, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人闹炉。 一個月前我還...
    沈念sama閱讀 46,297評論 2 360
  • 正文 我出身青樓蒿赢,卻偏偏與公主長得像,于是被迫代替她去往敵國和親渣触。 傳聞我的和親對象是個殘疾皇子羡棵,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,472評論 2 348

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)嗅钻,斷路器皂冰,智...
    卡卡羅2017閱讀 134,629評論 18 139
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 15,890評論 2 11
  • 去年有段時間得空,就把谷歌GAE的API權(quán)威指南看了一遍啊犬,收獲頗豐灼擂,特別是在自己幾乎獨立開發(fā)了公司的云數(shù)據(jù)中心之后...
    騎單車的勛爵閱讀 20,475評論 0 41
  • 7.1 分區(qū)分配策略 在 3.1 節(jié)中講述了消費者與消費組的模型,并且在默認(rèn)分區(qū)分配策略的背景下通過案例進(jìn) 行了具...
    tracy_668閱讀 1,047評論 0 2
  • 轉(zhuǎn)載:Dapper,大規(guī)模分布式系統(tǒng)的跟蹤系統(tǒng) 當(dāng)代的互聯(lián)網(wǎng)的服務(wù)语御,通常都是用復(fù)雜的峻贮、大規(guī)模分布式集群來實現(xiàn)的∮Υ常互...
    meng_philip123閱讀 1,060評論 0 7