diff --git a/docs/content.en/docs/release-notes/_index.md b/docs/content.en/docs/release-notes/_index.md index 2cf0fb2..7018405 100644 --- a/docs/content.en/docs/release-notes/_index.md +++ b/docs/content.en/docs/release-notes/_index.md @@ -12,6 +12,7 @@ Information about release notes of INFINI Gateway is provided here. ### Breaking changes ### Features +- Support simple bulk metadata in bulk related filters (#59) ### Bug fix diff --git a/proxy/filters/elastic/bulk_request_mutate.go b/proxy/filters/elastic/bulk_request_mutate.go index 9ee63da..63c63a4 100644 --- a/proxy/filters/elastic/bulk_request_mutate.go +++ b/proxy/filters/elastic/bulk_request_mutate.go @@ -66,29 +66,14 @@ func (this *ElasticsearchBulkRequestMutate) Filter(ctx *fasthttp.RequestCtx) { var bulkBuff *bytebufferpool.ByteBuffer = bytebufferpool.Get("bulk_mutate_request_docs") defer bytebufferpool.Put("bulk_mutate_request_docs", bulkBuff) var metaCollected bool - docCount, err := elastic.WalkBulkRequests(body, func(eachLine []byte) (skipNextLine bool) { + docCount, err := elastic.WalkBulkRequests(pathStr, body, func(eachLine []byte) (skipNextLine bool) { return false }, func(metaBytes []byte, actionStr, index, typeName, id, routing string, offset int) (err error) { metaCollected = false metaStr := util.UnsafeBytesToString(metaBytes) - - //url level - var urlLevelIndex string - var urlLevelType string - - urlLevelIndex, urlLevelType = elastic.ParseUrlLevelBulkMeta(pathStr) - var indexNew, typeNew, idNew string - if index == "" && urlLevelIndex != "" { - index = urlLevelIndex - indexNew = urlLevelIndex - } - if typeName != typeNew && typeName == "" && !this.RemoveTypeMeta && urlLevelType != "" { - typeName = urlLevelType - typeNew = urlLevelType - } if (actionStr == elastic.ActionIndex || actionStr == elastic.ActionCreate) && (len(id) == 0 || id == "null") && this.FixNilID { randID := util.GetUUID() if this.AddTimestampToID { @@ -110,6 +95,13 @@ func (this *ElasticsearchBulkRequestMutate) Filter(ctx *fasthttp.RequestCtx) { } } + //index should not be empty, or will be use the default index + if index == "" { + index = this.DefaultIndex + idNew = this.DefaultIndex + } + + //handle the index rename if index != "" && len(this.IndexNameRename) > 0 { v, ok := this.IndexNameRename[index] if ok { @@ -124,6 +116,7 @@ func (this *ElasticsearchBulkRequestMutate) Filter(ctx *fasthttp.RequestCtx) { } } + //handle the type rename if typeName != "" && !this.RemoveTypeMeta && len(this.TypeNameRename) > 0 { v, ok := this.TypeNameRename[typeName] if ok && v != typeName { diff --git a/proxy/filters/elastic/bulk_request_resort.go b/proxy/filters/elastic/bulk_request_resort.go index 3cc19c9..7270fae 100644 --- a/proxy/filters/elastic/bulk_request_resort.go +++ b/proxy/filters/elastic/bulk_request_resort.go @@ -312,7 +312,7 @@ func (filter *BulkRequestResort) Filter(ctx *fasthttp.RequestCtx) { docs := map[int][]elastic.VersionInfo{} - elastic.WalkBulkRequests(requestBody, nil, + elastic.WalkBulkRequests(pathStr, requestBody, nil, func(metaBytes []byte, actionStr, index, typeName, id, routing string, docCount int) (err error) { if lastRecord, collect = docsToReplicate[offset]; collect { lastRecord.Payload = append(lastRecord.Payload, bytes.Copy(metaBytes)) @@ -414,8 +414,6 @@ func (s *Sorter) run() { if docCount > 0 { - //log.Error("docCount:", docCount, " fetch documents:", len(documents)) - //if global.Env().IsDebug { stats.IncrementBy("bulk_request_resort", fmt.Sprintf("%v_%v_get", s.filter.inputQueueConfig.ID, s.partitionID), int64(docCount)) //} diff --git a/proxy/filters/elastic/bulk_reshuffle.go b/proxy/filters/elastic/bulk_reshuffle.go index 9ddd247..c3b9186 100644 --- a/proxy/filters/elastic/bulk_reshuffle.go +++ b/proxy/filters/elastic/bulk_reshuffle.go @@ -196,7 +196,7 @@ func (this *BulkReshuffle) Filter(ctx *fasthttp.RequestCtx) { var hitMetadataNotFound bool - docCount, err := elastic.WalkBulkRequests(body, func(eachLine []byte) (skipNextLine bool) { + docCount, err := elastic.WalkBulkRequests(pathStr, body, func(eachLine []byte) (skipNextLine bool) { if validEachLine { obj := map[string]interface{}{} err := util.FromJSONBytes(eachLine, &obj) @@ -214,26 +214,7 @@ func (this *BulkReshuffle) Filter(ctx *fasthttp.RequestCtx) { shardID := 0 - var indexNew, typeNew, idNew string - - //only handle empty index - if index == "" { - //url level - var urlLevelIndex string - var urlLevelType string - - urlLevelIndex, urlLevelType = elastic.ParseUrlLevelBulkMeta(pathStr) - - if index == "" && urlLevelIndex != "" { - index = urlLevelIndex - indexNew = urlLevelIndex - } - - if typeName == "" && urlLevelType != "" { - typeName = urlLevelType - typeNew = urlLevelType - } - } + var idNew string if (actionStr == elastic.ActionIndex || actionStr == elastic.ActionCreate) && (len(id) == 0 || id == "null") && fixNullID { id = util.GetUUID() @@ -243,9 +224,9 @@ func (this *BulkReshuffle) Filter(ctx *fasthttp.RequestCtx) { } } - if indexNew != "" || typeNew != "" || idNew != "" { + if idNew != "" { var err error - metaBytes, err = updateJsonWithNewIndex(actionStr, metaBytes, indexNew, typeNew, idNew) + metaBytes, err = elastic.UpdateBulkMetadata(actionStr, metaBytes, "", "", idNew) if err != nil { panic(err) } @@ -630,34 +611,3 @@ func batchUpdateJson(scannedByte []byte, action string, set, del map[string]stri return scannedByte, err } - -func updateJsonWithNewIndex(action string, scannedByte []byte, index, typeName, id string) (newBytes []byte, err error) { - - if global.Env().IsDebug { - log.Trace("update:", action, ",", index, ",", typeName, ",", id) - } - - newBytes = make([]byte, len(scannedByte)) - copy(newBytes, scannedByte) - - if index != "" { - newBytes, err = jsonparser.Set(newBytes, []byte("\""+index+"\""), action, "_index") - if err != nil { - return newBytes, err - } - } - if typeName != "" { - newBytes, err = jsonparser.Set(newBytes, []byte("\""+typeName+"\""), action, "_type") - if err != nil { - return newBytes, err - } - } - if id != "" { - newBytes, err = jsonparser.Set(newBytes, []byte("\""+id+"\""), action, "_id") - if err != nil { - return newBytes, err - } - } - - return newBytes, err -} diff --git a/proxy/filters/elastic/bulk_reshuffle_test.go b/proxy/filters/elastic/bulk_reshuffle_test.go index 38f7fb0..dfeabe3 100644 --- a/proxy/filters/elastic/bulk_reshuffle_test.go +++ b/proxy/filters/elastic/bulk_reshuffle_test.go @@ -25,10 +25,8 @@ package elastic import ( "fmt" - "github.com/buger/jsonparser" "github.com/magiconair/properties/assert" "infini.sh/framework/core/elastic" - "infini.sh/framework/core/util" "strings" "testing" ) @@ -72,125 +70,6 @@ func TestBulkReshuffle_MixedRequests(t *testing.T) { fmt.Println(string(data)) } -func TestParseActionMeta1(t *testing.T) { - - data := []byte("{\"index\":{\"_index\":\"medcl1\",\"_type\":\"_doc\",\"_id\":\"GZq-bnYBC53QmW9Kk2ve\"}}") - action := util.ExtractFieldFromBytes(&data, elastic.ActionStart, elastic.ActionEnd, nil) - fmt.Println(string(action)) - indexb, _, _, _ := jsonparser.Get(data, util.UnsafeBytesToString(action), "_index") - fmt.Println(string(indexb)) - assert.Equal(t, string(action), "index") - assert.Equal(t, string(indexb), "medcl1") - idb, _, _, _ := jsonparser.Get(data, util.UnsafeBytesToString(action), "_id") - assert.Equal(t, string(idb), "GZq-bnYBC53QmW9Kk2ve") - - //update json bytes - new, _ := jsonparser.Set(data, []byte("medcl2"), "index", "_index") - fmt.Println("new:", string(new)) - -} - -func TestParseActionMeta2(t *testing.T) { - - data := []byte("{\"index\":{\"_index\":\"medcl1\",\"_type\":\"_doc\",\"_id\":\"GZq-bnYBC53QmW9Kk2ve\"}}") - - action, indexb, typeb, idb, _, _ := elastic.ParseActionMeta(data) - fmt.Println(string(action), string(indexb), string(idb)) - assert.Equal(t, string(action), "index") - assert.Equal(t, string(indexb), "medcl1") - assert.Equal(t, string(typeb), "_doc") - assert.Equal(t, string(idb), "GZq-bnYBC53QmW9Kk2ve") - - data = []byte("{\"index\":{\"_type\":\"_doc\",\"_id\":\"GZq-bnYBC53QmW9Kk2ve\",\"_index\":\"medcl1\"}}") - - action, indexb, typeb, idb, _, _ = elastic.ParseActionMeta(data) - - fmt.Println(string(action), string(indexb), string(idb)) - assert.Equal(t, string(action), "index") - assert.Equal(t, string(indexb), "medcl1") - assert.Equal(t, string(typeb), "_doc") - assert.Equal(t, string(idb), "GZq-bnYBC53QmW9Kk2ve") - - data = []byte("{\"index\":{\"_id\":\"GZq-bnYBC53QmW9Kk2ve\",\"_type\":\"_doc\",\"_index\":\"medcl1\"}}") - - action, indexb, typeb, idb, _, _ = elastic.ParseActionMeta(data) - - fmt.Println(string(action), string(indexb), string(idb)) - assert.Equal(t, string(action), "index") - assert.Equal(t, string(indexb), "medcl1") - assert.Equal(t, string(typeb), "_doc") - assert.Equal(t, string(idb), "GZq-bnYBC53QmW9Kk2ve") - - data = []byte("{\"index\":{\"_index\":\"test\",\"_type\":\"doc\"}}") - action, indexb, typeb, idb, _, _ = elastic.ParseActionMeta(data) - - fmt.Println(string(action), string(indexb), string(idb)) - assert.Equal(t, string(action), "index") - assert.Equal(t, string(indexb), "test") - assert.Equal(t, string(typeb), "doc") - assert.Equal(t, string(idb), "") - - data = []byte("{\"delete\":{\"_index\":\"test\",\"_type\":\"_doc\"}}") - action, indexb, typeb, idb, _, _ = elastic.ParseActionMeta(data) - - fmt.Println(string(action), string(indexb), string(idb)) - assert.Equal(t, string(action), "delete") - assert.Equal(t, string(indexb), "test") - assert.Equal(t, string(typeb), "_doc") - assert.Equal(t, string(idb), "") - - data = []byte("{\"create\":{\"_index\":\"test\",\"_type\":\"_doc\"}}") - action, indexb, typeb, idb, _, _ = elastic.ParseActionMeta(data) - - fmt.Println(string(action), string(indexb), string(idb)) - assert.Equal(t, string(action), "create") - assert.Equal(t, string(indexb), "test") - assert.Equal(t, string(typeb), "_doc") - assert.Equal(t, string(idb), "") - - data = []byte("{ \"update\" : {\"_id\" : \"1\", \"_index\" : \"test\"} }") - action, indexb, typeb, idb, _, _ = elastic.ParseActionMeta(data) - - fmt.Println(string(action), string(indexb), string(idb)) - assert.Equal(t, string(action), "update") - assert.Equal(t, string(indexb), "test") - assert.Equal(t, string(typeb), "") - assert.Equal(t, string(idb), "1") - - data = []byte("{ \"update\" : {\"_index\" : \"test\"} }") - action, indexb, typeb, idb, _, _ = elastic.ParseActionMeta(data) - - fmt.Println(string(action), string(indexb), string(idb)) - assert.Equal(t, string(action), "update") - assert.Equal(t, string(indexb), "test") - assert.Equal(t, string(typeb), "") - assert.Equal(t, string(idb), "") - -} - -func TestParseActionMeta3(t *testing.T) { - - data := []byte("{\"index\":{\"_index\":\"medcl1\",\"_type\":\"_doc\",\"_id\":\"GZq-bnYBC53QmW9Kk2ve\"}}") - newData, err := updateJsonWithNewIndex("index", data, "newIndex", "newType", "newId") - fmt.Println(err, string(newData)) - assert.Equal(t, string(newData), "{\"index\":{\"_index\":\"newIndex\",\"_type\":\"newType\",\"_id\":\"newId\"}}") - - data = []byte("{\"index\":{\"_index\":\"medcl1\",\"_id\":\"GZq-bnYBC53QmW9Kk2ve\"}}") - newData, err = updateJsonWithNewIndex("index", data, "newIndex", "newType", "newId") - fmt.Println(err, string(newData)) - assert.Equal(t, string(newData), "{\"index\":{\"_index\":\"newIndex\",\"_id\":\"newId\",\"_type\":\"newType\"}}") - - data = []byte("{\"index\":{\"_index\":\"medcl1\",\"_type\":\"doc1\"}}") - newData, err = updateJsonWithNewIndex("index", data, "newIndex", "newType", "newId") - fmt.Println(err, string(newData)) - assert.Equal(t, string(newData), "{\"index\":{\"_index\":\"newIndex\",\"_type\":\"newType\",\"_id\":\"newId\"}}") - - data = []byte("{\"index\":{\"_index\":\"medcl1\",\"_type\":\"doc1\"}}") - newData, err = updateJsonWithNewIndex("index", data, "", "", "newId") - fmt.Println(err, string(newData)) - assert.Equal(t, string(newData), "{\"index\":{\"_index\":\"medcl1\",\"_type\":\"doc1\",\"_id\":\"newId\"}}") -} - func TestGetUrlLevelMeta(t *testing.T) { pathStr := "/index/_bulk" diff --git a/proxy/filters/security/ldap/ldap_test.go b/proxy/filters/security/ldap/ldap_test.go index 7d6b393..3fa878e 100644 --- a/proxy/filters/security/ldap/ldap_test.go +++ b/proxy/filters/security/ldap/ldap_test.go @@ -45,7 +45,6 @@ func TestLDAPFunctions(t *testing.T) { UserFilter: "(uid=%s)", } - r := &fasthttp.Request{} r.SetBasicAuth("galieleo", "password") diff --git a/proxy/filters/throttle/bulk_request_throttle.go b/proxy/filters/throttle/bulk_request_throttle.go index bd7722e..291768f 100644 --- a/proxy/filters/throttle/bulk_request_throttle.go +++ b/proxy/filters/throttle/bulk_request_throttle.go @@ -63,18 +63,9 @@ func (this *ElasticsearchBulkRequestThrottle) Filter(ctx *fasthttp.RequestCtx) { var indexOpStats = map[string]int{} var indexPayloadStats = map[string]int{} - docCount, err := elastic.WalkBulkRequests(body, func(eachLine []byte) (skipNextLine bool) { + docCount, err := elastic.WalkBulkRequests(pathStr, body, func(eachLine []byte) (skipNextLine bool) { return false }, func(metaBytes []byte, actionStr, index, typeName, id, routing string, offset int) (err error) { - if index == "" { - //url level - var urlLevelIndex string - urlLevelIndex, _ = elastic.ParseUrlLevelBulkMeta(pathStr) - if urlLevelIndex != "" { - index = urlLevelIndex - } - } - //stats v, ok := indexOpStats[index] if !ok {