From 07764296e4ee115df413f2a6941ea056828a9254 Mon Sep 17 00:00:00 2001 From: medcl Date: Mon, 13 Jan 2025 16:59:25 +0800 Subject: [PATCH 1/5] feat: support simple bulk metadata --- proxy/filters/elastic/bulk_request_mutate.go | 33 ++++---- proxy/filters/elastic/bulk_request_resort.go | 2 +- proxy/filters/elastic/bulk_reshuffle.go | 78 ++++++------------- proxy/filters/security/ldap/ldap_test.go | 1 - .../filters/throttle/bulk_request_throttle.go | 18 ++--- 5 files changed, 54 insertions(+), 78 deletions(-) diff --git a/proxy/filters/elastic/bulk_request_mutate.go b/proxy/filters/elastic/bulk_request_mutate.go index 9ee63da..bfbfb4b 100644 --- a/proxy/filters/elastic/bulk_request_mutate.go +++ b/proxy/filters/elastic/bulk_request_mutate.go @@ -66,7 +66,7 @@ func (this *ElasticsearchBulkRequestMutate) Filter(ctx *fasthttp.RequestCtx) { var bulkBuff *bytebufferpool.ByteBuffer = bytebufferpool.Get("bulk_mutate_request_docs") defer bytebufferpool.Put("bulk_mutate_request_docs", bulkBuff) var metaCollected bool - docCount, err := elastic.WalkBulkRequests(body, func(eachLine []byte) (skipNextLine bool) { + docCount, err := elastic.WalkBulkRequests(pathStr, body, func(eachLine []byte) (skipNextLine bool) { return false }, func(metaBytes []byte, actionStr, index, typeName, id, routing string, offset int) (err error) { metaCollected = false @@ -74,21 +74,20 @@ func (this *ElasticsearchBulkRequestMutate) Filter(ctx *fasthttp.RequestCtx) { metaStr := util.UnsafeBytesToString(metaBytes) //url level - var urlLevelIndex string - var urlLevelType string - - urlLevelIndex, urlLevelType = elastic.ParseUrlLevelBulkMeta(pathStr) + //var urlLevelIndex string + //var urlLevelType string + //urlLevelIndex, urlLevelType = elastic.ParseUrlLevelBulkMeta(pathStr) var indexNew, typeNew, idNew string - if index == "" && urlLevelIndex != "" { - index = urlLevelIndex - indexNew = urlLevelIndex - } + //if index == "" && urlLevelIndex != "" { + // index = urlLevelIndex + // indexNew = urlLevelIndex + //} + //if typeName != typeNew && typeName == "" && !this.RemoveTypeMeta && urlLevelType != "" { + // typeName = urlLevelType + // typeNew = urlLevelType + //} - if typeName != typeNew && typeName == "" && !this.RemoveTypeMeta && urlLevelType != "" { - typeName = urlLevelType - typeNew = urlLevelType - } if (actionStr == elastic.ActionIndex || actionStr == elastic.ActionCreate) && (len(id) == 0 || id == "null") && this.FixNilID { randID := util.GetUUID() if this.AddTimestampToID { @@ -110,6 +109,13 @@ func (this *ElasticsearchBulkRequestMutate) Filter(ctx *fasthttp.RequestCtx) { } } + //index should not be empty, or will be use the default index + if index == "" { + index = this.DefaultIndex + idNew = this.DefaultIndex + } + + //handle the index rename if index != "" && len(this.IndexNameRename) > 0 { v, ok := this.IndexNameRename[index] if ok { @@ -124,6 +130,7 @@ func (this *ElasticsearchBulkRequestMutate) Filter(ctx *fasthttp.RequestCtx) { } } + //handle the type rename if typeName != "" && !this.RemoveTypeMeta && len(this.TypeNameRename) > 0 { v, ok := this.TypeNameRename[typeName] if ok && v != typeName { diff --git a/proxy/filters/elastic/bulk_request_resort.go b/proxy/filters/elastic/bulk_request_resort.go index 3cc19c9..88e2e9c 100644 --- a/proxy/filters/elastic/bulk_request_resort.go +++ b/proxy/filters/elastic/bulk_request_resort.go @@ -312,7 +312,7 @@ func (filter *BulkRequestResort) Filter(ctx *fasthttp.RequestCtx) { docs := map[int][]elastic.VersionInfo{} - elastic.WalkBulkRequests(requestBody, nil, + elastic.WalkBulkRequests(pathStr, requestBody, nil, func(metaBytes []byte, actionStr, index, typeName, id, routing string, docCount int) (err error) { if lastRecord, collect = docsToReplicate[offset]; collect { lastRecord.Payload = append(lastRecord.Payload, bytes.Copy(metaBytes)) diff --git a/proxy/filters/elastic/bulk_reshuffle.go b/proxy/filters/elastic/bulk_reshuffle.go index 9ddd247..68fb346 100644 --- a/proxy/filters/elastic/bulk_reshuffle.go +++ b/proxy/filters/elastic/bulk_reshuffle.go @@ -196,7 +196,7 @@ func (this *BulkReshuffle) Filter(ctx *fasthttp.RequestCtx) { var hitMetadataNotFound bool - docCount, err := elastic.WalkBulkRequests(body, func(eachLine []byte) (skipNextLine bool) { + docCount, err := elastic.WalkBulkRequests(pathStr, body, func(eachLine []byte) (skipNextLine bool) { if validEachLine { obj := map[string]interface{}{} err := util.FromJSONBytes(eachLine, &obj) @@ -214,26 +214,26 @@ func (this *BulkReshuffle) Filter(ctx *fasthttp.RequestCtx) { shardID := 0 - var indexNew, typeNew, idNew string - - //only handle empty index - if index == "" { - //url level - var urlLevelIndex string - var urlLevelType string - - urlLevelIndex, urlLevelType = elastic.ParseUrlLevelBulkMeta(pathStr) - - if index == "" && urlLevelIndex != "" { - index = urlLevelIndex - indexNew = urlLevelIndex - } - - if typeName == "" && urlLevelType != "" { - typeName = urlLevelType - typeNew = urlLevelType - } - } + var idNew string + //var indexNew, typeNew, idNew string + ////only handle empty index + //if index == "" { + // //url levelbulk_processor.go:135 + // var urlLevelIndex string + // var urlLevelType string + // + // urlLevelIndex, urlLevelType = elastic.ParseUrlLevelBulkMeta(pathStr) + // + // if index == "" && urlLevelIndex != "" { + // index = urlLevelIndex + // indexNew = urlLevelIndex + // } + // + // if typeName == "" && urlLevelType != "" { + // typeName = urlLevelType + // typeNew = urlLevelType + // } + //} if (actionStr == elastic.ActionIndex || actionStr == elastic.ActionCreate) && (len(id) == 0 || id == "null") && fixNullID { id = util.GetUUID() @@ -243,9 +243,10 @@ func (this *BulkReshuffle) Filter(ctx *fasthttp.RequestCtx) { } } - if indexNew != "" || typeNew != "" || idNew != "" { + //if indexNew != "" || typeNew != "" || idNew != "" { + if idNew != "" { var err error - metaBytes, err = updateJsonWithNewIndex(actionStr, metaBytes, indexNew, typeNew, idNew) + metaBytes, err = elastic.UpdateBulkMetadata(actionStr, metaBytes, "", "", idNew) if err != nil { panic(err) } @@ -630,34 +631,3 @@ func batchUpdateJson(scannedByte []byte, action string, set, del map[string]stri return scannedByte, err } - -func updateJsonWithNewIndex(action string, scannedByte []byte, index, typeName, id string) (newBytes []byte, err error) { - - if global.Env().IsDebug { - log.Trace("update:", action, ",", index, ",", typeName, ",", id) - } - - newBytes = make([]byte, len(scannedByte)) - copy(newBytes, scannedByte) - - if index != "" { - newBytes, err = jsonparser.Set(newBytes, []byte("\""+index+"\""), action, "_index") - if err != nil { - return newBytes, err - } - } - if typeName != "" { - newBytes, err = jsonparser.Set(newBytes, []byte("\""+typeName+"\""), action, "_type") - if err != nil { - return newBytes, err - } - } - if id != "" { - newBytes, err = jsonparser.Set(newBytes, []byte("\""+id+"\""), action, "_id") - if err != nil { - return newBytes, err - } - } - - return newBytes, err -} diff --git a/proxy/filters/security/ldap/ldap_test.go b/proxy/filters/security/ldap/ldap_test.go index 7d6b393..3fa878e 100644 --- a/proxy/filters/security/ldap/ldap_test.go +++ b/proxy/filters/security/ldap/ldap_test.go @@ -45,7 +45,6 @@ func TestLDAPFunctions(t *testing.T) { UserFilter: "(uid=%s)", } - r := &fasthttp.Request{} r.SetBasicAuth("galieleo", "password") diff --git a/proxy/filters/throttle/bulk_request_throttle.go b/proxy/filters/throttle/bulk_request_throttle.go index bd7722e..d81cb0c 100644 --- a/proxy/filters/throttle/bulk_request_throttle.go +++ b/proxy/filters/throttle/bulk_request_throttle.go @@ -63,17 +63,17 @@ func (this *ElasticsearchBulkRequestThrottle) Filter(ctx *fasthttp.RequestCtx) { var indexOpStats = map[string]int{} var indexPayloadStats = map[string]int{} - docCount, err := elastic.WalkBulkRequests(body, func(eachLine []byte) (skipNextLine bool) { + docCount, err := elastic.WalkBulkRequests(pathStr, body, func(eachLine []byte) (skipNextLine bool) { return false }, func(metaBytes []byte, actionStr, index, typeName, id, routing string, offset int) (err error) { - if index == "" { - //url level - var urlLevelIndex string - urlLevelIndex, _ = elastic.ParseUrlLevelBulkMeta(pathStr) - if urlLevelIndex != "" { - index = urlLevelIndex - } - } + //if index == "" { + // //url level + // var urlLevelIndex string + // urlLevelIndex, _ = elastic.ParseUrlLevelBulkMeta(pathStr) + // if urlLevelIndex != "" { + // index = urlLevelIndex + // } + //} //stats v, ok := indexOpStats[index] From f90ed2251cdfd602fa491c4ce45f7b5a5c95410d Mon Sep 17 00:00:00 2001 From: medcl Date: Mon, 13 Jan 2025 17:03:53 +0800 Subject: [PATCH 2/5] chore: cleanup comments --- proxy/filters/elastic/bulk_request_mutate.go | 14 ------------- proxy/filters/elastic/bulk_request_resort.go | 2 -- proxy/filters/elastic/bulk_reshuffle.go | 20 ------------------- .../filters/throttle/bulk_request_throttle.go | 9 --------- 4 files changed, 45 deletions(-) diff --git a/proxy/filters/elastic/bulk_request_mutate.go b/proxy/filters/elastic/bulk_request_mutate.go index bfbfb4b..63c63a4 100644 --- a/proxy/filters/elastic/bulk_request_mutate.go +++ b/proxy/filters/elastic/bulk_request_mutate.go @@ -72,21 +72,7 @@ func (this *ElasticsearchBulkRequestMutate) Filter(ctx *fasthttp.RequestCtx) { metaCollected = false metaStr := util.UnsafeBytesToString(metaBytes) - - //url level - //var urlLevelIndex string - //var urlLevelType string - //urlLevelIndex, urlLevelType = elastic.ParseUrlLevelBulkMeta(pathStr) - var indexNew, typeNew, idNew string - //if index == "" && urlLevelIndex != "" { - // index = urlLevelIndex - // indexNew = urlLevelIndex - //} - //if typeName != typeNew && typeName == "" && !this.RemoveTypeMeta && urlLevelType != "" { - // typeName = urlLevelType - // typeNew = urlLevelType - //} if (actionStr == elastic.ActionIndex || actionStr == elastic.ActionCreate) && (len(id) == 0 || id == "null") && this.FixNilID { randID := util.GetUUID() diff --git a/proxy/filters/elastic/bulk_request_resort.go b/proxy/filters/elastic/bulk_request_resort.go index 88e2e9c..7270fae 100644 --- a/proxy/filters/elastic/bulk_request_resort.go +++ b/proxy/filters/elastic/bulk_request_resort.go @@ -414,8 +414,6 @@ func (s *Sorter) run() { if docCount > 0 { - //log.Error("docCount:", docCount, " fetch documents:", len(documents)) - //if global.Env().IsDebug { stats.IncrementBy("bulk_request_resort", fmt.Sprintf("%v_%v_get", s.filter.inputQueueConfig.ID, s.partitionID), int64(docCount)) //} diff --git a/proxy/filters/elastic/bulk_reshuffle.go b/proxy/filters/elastic/bulk_reshuffle.go index 68fb346..c3b9186 100644 --- a/proxy/filters/elastic/bulk_reshuffle.go +++ b/proxy/filters/elastic/bulk_reshuffle.go @@ -215,25 +215,6 @@ func (this *BulkReshuffle) Filter(ctx *fasthttp.RequestCtx) { shardID := 0 var idNew string - //var indexNew, typeNew, idNew string - ////only handle empty index - //if index == "" { - // //url levelbulk_processor.go:135 - // var urlLevelIndex string - // var urlLevelType string - // - // urlLevelIndex, urlLevelType = elastic.ParseUrlLevelBulkMeta(pathStr) - // - // if index == "" && urlLevelIndex != "" { - // index = urlLevelIndex - // indexNew = urlLevelIndex - // } - // - // if typeName == "" && urlLevelType != "" { - // typeName = urlLevelType - // typeNew = urlLevelType - // } - //} if (actionStr == elastic.ActionIndex || actionStr == elastic.ActionCreate) && (len(id) == 0 || id == "null") && fixNullID { id = util.GetUUID() @@ -243,7 +224,6 @@ func (this *BulkReshuffle) Filter(ctx *fasthttp.RequestCtx) { } } - //if indexNew != "" || typeNew != "" || idNew != "" { if idNew != "" { var err error metaBytes, err = elastic.UpdateBulkMetadata(actionStr, metaBytes, "", "", idNew) diff --git a/proxy/filters/throttle/bulk_request_throttle.go b/proxy/filters/throttle/bulk_request_throttle.go index d81cb0c..291768f 100644 --- a/proxy/filters/throttle/bulk_request_throttle.go +++ b/proxy/filters/throttle/bulk_request_throttle.go @@ -66,15 +66,6 @@ func (this *ElasticsearchBulkRequestThrottle) Filter(ctx *fasthttp.RequestCtx) { docCount, err := elastic.WalkBulkRequests(pathStr, body, func(eachLine []byte) (skipNextLine bool) { return false }, func(metaBytes []byte, actionStr, index, typeName, id, routing string, offset int) (err error) { - //if index == "" { - // //url level - // var urlLevelIndex string - // urlLevelIndex, _ = elastic.ParseUrlLevelBulkMeta(pathStr) - // if urlLevelIndex != "" { - // index = urlLevelIndex - // } - //} - //stats v, ok := indexOpStats[index] if !ok { From 69223e3c77604089ca90db4c414565fa05f474ed Mon Sep 17 00:00:00 2001 From: medcl Date: Wed, 15 Jan 2025 14:11:14 +0800 Subject: [PATCH 3/5] chore: remove unused tests --- proxy/filters/elastic/bulk_reshuffle_test.go | 119 ------------------- 1 file changed, 119 deletions(-) diff --git a/proxy/filters/elastic/bulk_reshuffle_test.go b/proxy/filters/elastic/bulk_reshuffle_test.go index 38f7fb0..0d8cbd6 100644 --- a/proxy/filters/elastic/bulk_reshuffle_test.go +++ b/proxy/filters/elastic/bulk_reshuffle_test.go @@ -72,125 +72,6 @@ func TestBulkReshuffle_MixedRequests(t *testing.T) { fmt.Println(string(data)) } -func TestParseActionMeta1(t *testing.T) { - - data := []byte("{\"index\":{\"_index\":\"medcl1\",\"_type\":\"_doc\",\"_id\":\"GZq-bnYBC53QmW9Kk2ve\"}}") - action := util.ExtractFieldFromBytes(&data, elastic.ActionStart, elastic.ActionEnd, nil) - fmt.Println(string(action)) - indexb, _, _, _ := jsonparser.Get(data, util.UnsafeBytesToString(action), "_index") - fmt.Println(string(indexb)) - assert.Equal(t, string(action), "index") - assert.Equal(t, string(indexb), "medcl1") - idb, _, _, _ := jsonparser.Get(data, util.UnsafeBytesToString(action), "_id") - assert.Equal(t, string(idb), "GZq-bnYBC53QmW9Kk2ve") - - //update json bytes - new, _ := jsonparser.Set(data, []byte("medcl2"), "index", "_index") - fmt.Println("new:", string(new)) - -} - -func TestParseActionMeta2(t *testing.T) { - - data := []byte("{\"index\":{\"_index\":\"medcl1\",\"_type\":\"_doc\",\"_id\":\"GZq-bnYBC53QmW9Kk2ve\"}}") - - action, indexb, typeb, idb, _, _ := elastic.ParseActionMeta(data) - fmt.Println(string(action), string(indexb), string(idb)) - assert.Equal(t, string(action), "index") - assert.Equal(t, string(indexb), "medcl1") - assert.Equal(t, string(typeb), "_doc") - assert.Equal(t, string(idb), "GZq-bnYBC53QmW9Kk2ve") - - data = []byte("{\"index\":{\"_type\":\"_doc\",\"_id\":\"GZq-bnYBC53QmW9Kk2ve\",\"_index\":\"medcl1\"}}") - - action, indexb, typeb, idb, _, _ = elastic.ParseActionMeta(data) - - fmt.Println(string(action), string(indexb), string(idb)) - assert.Equal(t, string(action), "index") - assert.Equal(t, string(indexb), "medcl1") - assert.Equal(t, string(typeb), "_doc") - assert.Equal(t, string(idb), "GZq-bnYBC53QmW9Kk2ve") - - data = []byte("{\"index\":{\"_id\":\"GZq-bnYBC53QmW9Kk2ve\",\"_type\":\"_doc\",\"_index\":\"medcl1\"}}") - - action, indexb, typeb, idb, _, _ = elastic.ParseActionMeta(data) - - fmt.Println(string(action), string(indexb), string(idb)) - assert.Equal(t, string(action), "index") - assert.Equal(t, string(indexb), "medcl1") - assert.Equal(t, string(typeb), "_doc") - assert.Equal(t, string(idb), "GZq-bnYBC53QmW9Kk2ve") - - data = []byte("{\"index\":{\"_index\":\"test\",\"_type\":\"doc\"}}") - action, indexb, typeb, idb, _, _ = elastic.ParseActionMeta(data) - - fmt.Println(string(action), string(indexb), string(idb)) - assert.Equal(t, string(action), "index") - assert.Equal(t, string(indexb), "test") - assert.Equal(t, string(typeb), "doc") - assert.Equal(t, string(idb), "") - - data = []byte("{\"delete\":{\"_index\":\"test\",\"_type\":\"_doc\"}}") - action, indexb, typeb, idb, _, _ = elastic.ParseActionMeta(data) - - fmt.Println(string(action), string(indexb), string(idb)) - assert.Equal(t, string(action), "delete") - assert.Equal(t, string(indexb), "test") - assert.Equal(t, string(typeb), "_doc") - assert.Equal(t, string(idb), "") - - data = []byte("{\"create\":{\"_index\":\"test\",\"_type\":\"_doc\"}}") - action, indexb, typeb, idb, _, _ = elastic.ParseActionMeta(data) - - fmt.Println(string(action), string(indexb), string(idb)) - assert.Equal(t, string(action), "create") - assert.Equal(t, string(indexb), "test") - assert.Equal(t, string(typeb), "_doc") - assert.Equal(t, string(idb), "") - - data = []byte("{ \"update\" : {\"_id\" : \"1\", \"_index\" : \"test\"} }") - action, indexb, typeb, idb, _, _ = elastic.ParseActionMeta(data) - - fmt.Println(string(action), string(indexb), string(idb)) - assert.Equal(t, string(action), "update") - assert.Equal(t, string(indexb), "test") - assert.Equal(t, string(typeb), "") - assert.Equal(t, string(idb), "1") - - data = []byte("{ \"update\" : {\"_index\" : \"test\"} }") - action, indexb, typeb, idb, _, _ = elastic.ParseActionMeta(data) - - fmt.Println(string(action), string(indexb), string(idb)) - assert.Equal(t, string(action), "update") - assert.Equal(t, string(indexb), "test") - assert.Equal(t, string(typeb), "") - assert.Equal(t, string(idb), "") - -} - -func TestParseActionMeta3(t *testing.T) { - - data := []byte("{\"index\":{\"_index\":\"medcl1\",\"_type\":\"_doc\",\"_id\":\"GZq-bnYBC53QmW9Kk2ve\"}}") - newData, err := updateJsonWithNewIndex("index", data, "newIndex", "newType", "newId") - fmt.Println(err, string(newData)) - assert.Equal(t, string(newData), "{\"index\":{\"_index\":\"newIndex\",\"_type\":\"newType\",\"_id\":\"newId\"}}") - - data = []byte("{\"index\":{\"_index\":\"medcl1\",\"_id\":\"GZq-bnYBC53QmW9Kk2ve\"}}") - newData, err = updateJsonWithNewIndex("index", data, "newIndex", "newType", "newId") - fmt.Println(err, string(newData)) - assert.Equal(t, string(newData), "{\"index\":{\"_index\":\"newIndex\",\"_id\":\"newId\",\"_type\":\"newType\"}}") - - data = []byte("{\"index\":{\"_index\":\"medcl1\",\"_type\":\"doc1\"}}") - newData, err = updateJsonWithNewIndex("index", data, "newIndex", "newType", "newId") - fmt.Println(err, string(newData)) - assert.Equal(t, string(newData), "{\"index\":{\"_index\":\"newIndex\",\"_type\":\"newType\",\"_id\":\"newId\"}}") - - data = []byte("{\"index\":{\"_index\":\"medcl1\",\"_type\":\"doc1\"}}") - newData, err = updateJsonWithNewIndex("index", data, "", "", "newId") - fmt.Println(err, string(newData)) - assert.Equal(t, string(newData), "{\"index\":{\"_index\":\"medcl1\",\"_type\":\"doc1\",\"_id\":\"newId\"}}") -} - func TestGetUrlLevelMeta(t *testing.T) { pathStr := "/index/_bulk" From ff6aeba37332f118292ae0790d87d74bfc9c21a1 Mon Sep 17 00:00:00 2001 From: medcl Date: Wed, 15 Jan 2025 14:13:12 +0800 Subject: [PATCH 4/5] chore: remove unused tests --- docs/content.en/docs/release-notes/_index.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/content.en/docs/release-notes/_index.md b/docs/content.en/docs/release-notes/_index.md index 2cf0fb2..7018405 100644 --- a/docs/content.en/docs/release-notes/_index.md +++ b/docs/content.en/docs/release-notes/_index.md @@ -12,6 +12,7 @@ Information about release notes of INFINI Gateway is provided here. ### Breaking changes ### Features +- Support simple bulk metadata in bulk related filters (#59) ### Bug fix From 249666215d43c6a8959f07bde80ac7ee455646a9 Mon Sep 17 00:00:00 2001 From: medcl Date: Wed, 15 Jan 2025 14:22:55 +0800 Subject: [PATCH 5/5] chore: remove unsed import --- proxy/filters/elastic/bulk_reshuffle_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/proxy/filters/elastic/bulk_reshuffle_test.go b/proxy/filters/elastic/bulk_reshuffle_test.go index 0d8cbd6..dfeabe3 100644 --- a/proxy/filters/elastic/bulk_reshuffle_test.go +++ b/proxy/filters/elastic/bulk_reshuffle_test.go @@ -25,10 +25,8 @@ package elastic import ( "fmt" - "github.com/buger/jsonparser" "github.com/magiconair/properties/assert" "infini.sh/framework/core/elastic" - "infini.sh/framework/core/util" "strings" "testing" )