221 lines
		
	
	
		
			5.6 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			221 lines
		
	
	
		
			5.6 KiB
		
	
	
	
		
			Go
		
	
	
	
| package elastic
 | ||
| 
 | ||
| import (
 | ||
| 	"bytes"
 | ||
| 	"context"
 | ||
| 	"encoding/json"
 | ||
| 	"log"
 | ||
| 	"sync/atomic"
 | ||
| 	"time"
 | ||
| 
 | ||
| 	"git.apinb.com/bsm-sdk/core/vars"
 | ||
| 	"github.com/elastic/go-elasticsearch/v9"
 | ||
| 	"github.com/elastic/go-elasticsearch/v9/esapi"
 | ||
| 	"github.com/elastic/go-elasticsearch/v9/esutil"
 | ||
| )
 | ||
| 
 | ||
| type ES struct {
 | ||
| 	Client *elasticsearch.Client
 | ||
| }
 | ||
| 
 | ||
| func NewElastic(endpoints []string, username, password string) (*ES, error) {
 | ||
| 
 | ||
| 	cfg := elasticsearch.Config{
 | ||
| 		Addresses: endpoints,
 | ||
| 		Username:  username,
 | ||
| 		Password:  password,
 | ||
| 	}
 | ||
| 	var err error
 | ||
| 	client, err := elasticsearch.NewClient(cfg)
 | ||
| 	if err != nil {
 | ||
| 		return nil, err
 | ||
| 	}
 | ||
| 
 | ||
| 	return &ES{
 | ||
| 		Client: client,
 | ||
| 	}, nil
 | ||
| }
 | ||
| 
 | ||
| // idx 为空,默认随机唯一字符串
 | ||
| //
 | ||
| //	doc := map[string]interface{}{
 | ||
| //		"title":   "中国",
 | ||
| //		"content": "中国早日统一台湾",
 | ||
| //		"time":    time.Now().Unix(),
 | ||
| //		"date":    time.Now(),
 | ||
| //	}
 | ||
| func (es *ES) CreateDocument(index string, id string, doc *interface{}) {
 | ||
| 	var buf bytes.Buffer
 | ||
| 	if err := json.NewEncoder(&buf).Encode(doc); err != nil {
 | ||
| 		log.Println("Elastic NewEncoder:", err)
 | ||
| 	}
 | ||
| 	// Set up the request object.
 | ||
| 	req := esapi.IndexRequest{
 | ||
| 		Index:      index,
 | ||
| 		DocumentID: id,
 | ||
| 		Body:       &buf,
 | ||
| 		Refresh:    "true",
 | ||
| 	}
 | ||
| 
 | ||
| 	// Perform the request with the client.
 | ||
| 	res, err := req.Do(context.Background(), es.Client)
 | ||
| 	if err != nil {
 | ||
| 		log.Println("Elastic Error:", res.String())
 | ||
| 	}
 | ||
| 	defer res.Body.Close()
 | ||
| 
 | ||
| 	if res.IsError() {
 | ||
| 		log.Println("Elastic Error:", res.String())
 | ||
| 	}
 | ||
| }
 | ||
| 
 | ||
| // 批量写入文档。
 | ||
| // Action field configures the operation to perform (index, create, delete, update)
 | ||
| // create 如果文档不存在就创建,但如果文档存在就返回错误
 | ||
| // index 如果文档不存在就创建,如果文档存在就更新
 | ||
| // update 更新一个文档,如果文档不存在就返回错误
 | ||
| // delete 删除一个文档,如果要删除的文档id不存在,就返回错误
 | ||
| func (es *ES) Batch(index string, documens []map[string]interface{}, action string) {
 | ||
| 	log.SetFlags(0)
 | ||
| 
 | ||
| 	var (
 | ||
| 		countSuccessful uint64
 | ||
| 
 | ||
| 		err error
 | ||
| 	)
 | ||
| 
 | ||
| 	// >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
 | ||
| 	//
 | ||
| 	// Create the BulkIndexer
 | ||
| 	//
 | ||
| 	// NOTE: For optimal performance, consider using a third-party JSON decoding package.
 | ||
| 	//       See an example in the "benchmarks" folder.
 | ||
| 	//
 | ||
| 	bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
 | ||
| 		Index:         index,             // The default index name
 | ||
| 		Client:        es.Client,         // The Elasticsearch client
 | ||
| 		NumWorkers:    vars.ESNumWorkers, // The number of worker goroutines
 | ||
| 		FlushBytes:    vars.ESFlushBytes, // The flush threshold in bytes
 | ||
| 		FlushInterval: 30 * time.Second,  // The periodic flush interval
 | ||
| 	})
 | ||
| 	if err != nil {
 | ||
| 		log.Fatalf("Error creating the indexer: %s", err)
 | ||
| 	}
 | ||
| 
 | ||
| 	for _, doc := range documens {
 | ||
| 		id := doc["id"].(string)
 | ||
| 		// Prepare the data payload: encode article to JSON
 | ||
| 		data, err := json.Marshal(doc)
 | ||
| 		if err != nil {
 | ||
| 			log.Fatalf("Cannot encode documen %s: %s", id, err)
 | ||
| 		}
 | ||
| 
 | ||
| 		// >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
 | ||
| 		//
 | ||
| 		// Add an item to the BulkIndexer
 | ||
| 		//
 | ||
| 		err = bi.Add(
 | ||
| 			context.Background(),
 | ||
| 			esutil.BulkIndexerItem{
 | ||
| 
 | ||
| 				Action: action,
 | ||
| 
 | ||
| 				// DocumentID is the (optional) document ID
 | ||
| 				DocumentID: id,
 | ||
| 
 | ||
| 				// Body is an `io.Reader` with the payload
 | ||
| 				Body: bytes.NewReader(data),
 | ||
| 
 | ||
| 				// OnSuccess is called for each successful operation
 | ||
| 				OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
 | ||
| 					atomic.AddUint64(&countSuccessful, 1)
 | ||
| 				},
 | ||
| 
 | ||
| 				// OnFailure is called for each failed operation
 | ||
| 				OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
 | ||
| 					if err != nil {
 | ||
| 						log.Printf("ERROR: %s", err)
 | ||
| 					} else {
 | ||
| 						log.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason)
 | ||
| 					}
 | ||
| 				},
 | ||
| 			},
 | ||
| 		)
 | ||
| 		if err != nil {
 | ||
| 			log.Printf("Unexpected error: %s", err)
 | ||
| 		}
 | ||
| 		// <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
 | ||
| 	}
 | ||
| 
 | ||
| 	// >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
 | ||
| 	// Close the indexer
 | ||
| 	//
 | ||
| 	if err := bi.Close(context.Background()); err != nil {
 | ||
| 		log.Printf("Unexpected error: %s", err)
 | ||
| 	}
 | ||
| 
 | ||
| 	stats := bi.Stats()
 | ||
| 	if stats.NumFailed > 0 {
 | ||
| 		log.Printf("Indexed [%d] documents with [%d] errors", stats.NumFlushed, stats.NumFailed)
 | ||
| 	} else {
 | ||
| 		log.Printf("Successfully indexed [%d] documents", stats.NumFlushed)
 | ||
| 	}
 | ||
| }
 | ||
| 
 | ||
| func (es *ES) Search(index string, query map[string]interface{}) (res *esapi.Response, err error) {
 | ||
| 	var buf bytes.Buffer
 | ||
| 	if err = json.NewEncoder(&buf).Encode(query); err != nil {
 | ||
| 		return
 | ||
| 	}
 | ||
| 	// Perform the search request.
 | ||
| 	res, err = es.Client.Search(
 | ||
| 		es.Client.Search.WithContext(context.Background()),
 | ||
| 		es.Client.Search.WithIndex(index),
 | ||
| 		es.Client.Search.WithBody(&buf),
 | ||
| 		es.Client.Search.WithTrackTotalHits(true),
 | ||
| 		es.Client.Search.WithFrom(0),
 | ||
| 		es.Client.Search.WithSize(10),
 | ||
| 		es.Client.Search.WithSort("time:desc"),
 | ||
| 		es.Client.Search.WithPretty(),
 | ||
| 	)
 | ||
| 	if err != nil {
 | ||
| 		return
 | ||
| 	}
 | ||
| 	defer res.Body.Close()
 | ||
| 
 | ||
| 	return
 | ||
| }
 | ||
| 
 | ||
| // 删除 index 根据 索引名 id
 | ||
| func (es *ES) Delete(index, idx string) (res *esapi.Response, err error) {
 | ||
| 	res, err = es.Client.Delete(
 | ||
| 		index, // Index name
 | ||
| 		idx,   // Document ID
 | ||
| 		es.Client.Delete.WithRefresh("true"),
 | ||
| 	)
 | ||
| 	if err != nil {
 | ||
| 		return
 | ||
| 	}
 | ||
| 	defer res.Body.Close()
 | ||
| 
 | ||
| 	return
 | ||
| }
 | ||
| 
 | ||
| func (es *ES) DeleteByQuery(index []string, query map[string]interface{}) (res *esapi.Response, err error) {
 | ||
| 	var buf bytes.Buffer
 | ||
| 	if err = json.NewEncoder(&buf).Encode(query); err != nil {
 | ||
| 		return
 | ||
| 	}
 | ||
| 	// Perform the search request.
 | ||
| 	res, err = es.Client.DeleteByQuery(
 | ||
| 		index,
 | ||
| 		&buf,
 | ||
| 	)
 | ||
| 	if err != nil {
 | ||
| 		return
 | ||
| 	}
 | ||
| 	defer res.Body.Close()
 | ||
| 
 | ||
| 	return
 | ||
| }
 |