Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support simple bulk metadata in bulk related filters #59

Merged
merged 6 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/content.en/docs/release-notes/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Information about release notes of INFINI Gateway is provided here.
### Breaking changes

### Features
- Support simple bulk metadata in bulk related filters (#59)

### Bug fix

Expand Down
25 changes: 9 additions & 16 deletions proxy/filters/elastic/bulk_request_mutate.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,29 +66,14 @@ func (this *ElasticsearchBulkRequestMutate) Filter(ctx *fasthttp.RequestCtx) {
var bulkBuff *bytebufferpool.ByteBuffer = bytebufferpool.Get("bulk_mutate_request_docs")
defer bytebufferpool.Put("bulk_mutate_request_docs", bulkBuff)
var metaCollected bool
docCount, err := elastic.WalkBulkRequests(body, func(eachLine []byte) (skipNextLine bool) {
docCount, err := elastic.WalkBulkRequests(pathStr, body, func(eachLine []byte) (skipNextLine bool) {
return false
}, func(metaBytes []byte, actionStr, index, typeName, id, routing string, offset int) (err error) {
metaCollected = false

metaStr := util.UnsafeBytesToString(metaBytes)

//url level
var urlLevelIndex string
var urlLevelType string

urlLevelIndex, urlLevelType = elastic.ParseUrlLevelBulkMeta(pathStr)

var indexNew, typeNew, idNew string
if index == "" && urlLevelIndex != "" {
index = urlLevelIndex
indexNew = urlLevelIndex
}

if typeName != typeNew && typeName == "" && !this.RemoveTypeMeta && urlLevelType != "" {
typeName = urlLevelType
typeNew = urlLevelType
}
if (actionStr == elastic.ActionIndex || actionStr == elastic.ActionCreate) && (len(id) == 0 || id == "null") && this.FixNilID {
randID := util.GetUUID()
if this.AddTimestampToID {
Expand All @@ -110,6 +95,13 @@ func (this *ElasticsearchBulkRequestMutate) Filter(ctx *fasthttp.RequestCtx) {
}
}

//index should not be empty, or will be use the default index
if index == "" {
index = this.DefaultIndex
idNew = this.DefaultIndex
}

//handle the index rename
if index != "" && len(this.IndexNameRename) > 0 {
v, ok := this.IndexNameRename[index]
if ok {
Expand All @@ -124,6 +116,7 @@ func (this *ElasticsearchBulkRequestMutate) Filter(ctx *fasthttp.RequestCtx) {
}
}

//handle the type rename
if typeName != "" && !this.RemoveTypeMeta && len(this.TypeNameRename) > 0 {
v, ok := this.TypeNameRename[typeName]
if ok && v != typeName {
Expand Down
4 changes: 1 addition & 3 deletions proxy/filters/elastic/bulk_request_resort.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func (filter *BulkRequestResort) Filter(ctx *fasthttp.RequestCtx) {

docs := map[int][]elastic.VersionInfo{}

elastic.WalkBulkRequests(requestBody, nil,
elastic.WalkBulkRequests(pathStr, requestBody, nil,
func(metaBytes []byte, actionStr, index, typeName, id, routing string, docCount int) (err error) {
if lastRecord, collect = docsToReplicate[offset]; collect {
lastRecord.Payload = append(lastRecord.Payload, bytes.Copy(metaBytes))
Expand Down Expand Up @@ -414,8 +414,6 @@ func (s *Sorter) run() {

if docCount > 0 {

//log.Error("docCount:", docCount, " fetch documents:", len(documents))

//if global.Env().IsDebug {
stats.IncrementBy("bulk_request_resort", fmt.Sprintf("%v_%v_get", s.filter.inputQueueConfig.ID, s.partitionID), int64(docCount))
//}
Expand Down
58 changes: 4 additions & 54 deletions proxy/filters/elastic/bulk_reshuffle.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (this *BulkReshuffle) Filter(ctx *fasthttp.RequestCtx) {

var hitMetadataNotFound bool

docCount, err := elastic.WalkBulkRequests(body, func(eachLine []byte) (skipNextLine bool) {
docCount, err := elastic.WalkBulkRequests(pathStr, body, func(eachLine []byte) (skipNextLine bool) {
if validEachLine {
obj := map[string]interface{}{}
err := util.FromJSONBytes(eachLine, &obj)
Expand All @@ -214,26 +214,7 @@ func (this *BulkReshuffle) Filter(ctx *fasthttp.RequestCtx) {

shardID := 0

var indexNew, typeNew, idNew string

//only handle empty index
if index == "" {
//url level
var urlLevelIndex string
var urlLevelType string

urlLevelIndex, urlLevelType = elastic.ParseUrlLevelBulkMeta(pathStr)

if index == "" && urlLevelIndex != "" {
index = urlLevelIndex
indexNew = urlLevelIndex
}

if typeName == "" && urlLevelType != "" {
typeName = urlLevelType
typeNew = urlLevelType
}
}
var idNew string

if (actionStr == elastic.ActionIndex || actionStr == elastic.ActionCreate) && (len(id) == 0 || id == "null") && fixNullID {
id = util.GetUUID()
Expand All @@ -243,9 +224,9 @@ func (this *BulkReshuffle) Filter(ctx *fasthttp.RequestCtx) {
}
}

if indexNew != "" || typeNew != "" || idNew != "" {
if idNew != "" {
var err error
metaBytes, err = updateJsonWithNewIndex(actionStr, metaBytes, indexNew, typeNew, idNew)
metaBytes, err = elastic.UpdateBulkMetadata(actionStr, metaBytes, "", "", idNew)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -630,34 +611,3 @@ func batchUpdateJson(scannedByte []byte, action string, set, del map[string]stri

return scannedByte, err
}

func updateJsonWithNewIndex(action string, scannedByte []byte, index, typeName, id string) (newBytes []byte, err error) {

if global.Env().IsDebug {
log.Trace("update:", action, ",", index, ",", typeName, ",", id)
}

newBytes = make([]byte, len(scannedByte))
copy(newBytes, scannedByte)

if index != "" {
newBytes, err = jsonparser.Set(newBytes, []byte("\""+index+"\""), action, "_index")
if err != nil {
return newBytes, err
}
}
if typeName != "" {
newBytes, err = jsonparser.Set(newBytes, []byte("\""+typeName+"\""), action, "_type")
if err != nil {
return newBytes, err
}
}
if id != "" {
newBytes, err = jsonparser.Set(newBytes, []byte("\""+id+"\""), action, "_id")
if err != nil {
return newBytes, err
}
}

return newBytes, err
}
121 changes: 0 additions & 121 deletions proxy/filters/elastic/bulk_reshuffle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@ package elastic

import (
"fmt"
"github.com/buger/jsonparser"
"github.com/magiconair/properties/assert"
"infini.sh/framework/core/elastic"
"infini.sh/framework/core/util"
"strings"
"testing"
)
Expand Down Expand Up @@ -72,125 +70,6 @@ func TestBulkReshuffle_MixedRequests(t *testing.T) {
fmt.Println(string(data))
}

func TestParseActionMeta1(t *testing.T) {

data := []byte("{\"index\":{\"_index\":\"medcl1\",\"_type\":\"_doc\",\"_id\":\"GZq-bnYBC53QmW9Kk2ve\"}}")
action := util.ExtractFieldFromBytes(&data, elastic.ActionStart, elastic.ActionEnd, nil)
fmt.Println(string(action))
indexb, _, _, _ := jsonparser.Get(data, util.UnsafeBytesToString(action), "_index")
fmt.Println(string(indexb))
assert.Equal(t, string(action), "index")
assert.Equal(t, string(indexb), "medcl1")
idb, _, _, _ := jsonparser.Get(data, util.UnsafeBytesToString(action), "_id")
assert.Equal(t, string(idb), "GZq-bnYBC53QmW9Kk2ve")

//update json bytes
new, _ := jsonparser.Set(data, []byte("medcl2"), "index", "_index")
fmt.Println("new:", string(new))

}

func TestParseActionMeta2(t *testing.T) {

data := []byte("{\"index\":{\"_index\":\"medcl1\",\"_type\":\"_doc\",\"_id\":\"GZq-bnYBC53QmW9Kk2ve\"}}")

action, indexb, typeb, idb, _, _ := elastic.ParseActionMeta(data)
fmt.Println(string(action), string(indexb), string(idb))
assert.Equal(t, string(action), "index")
assert.Equal(t, string(indexb), "medcl1")
assert.Equal(t, string(typeb), "_doc")
assert.Equal(t, string(idb), "GZq-bnYBC53QmW9Kk2ve")

data = []byte("{\"index\":{\"_type\":\"_doc\",\"_id\":\"GZq-bnYBC53QmW9Kk2ve\",\"_index\":\"medcl1\"}}")

action, indexb, typeb, idb, _, _ = elastic.ParseActionMeta(data)

fmt.Println(string(action), string(indexb), string(idb))
assert.Equal(t, string(action), "index")
assert.Equal(t, string(indexb), "medcl1")
assert.Equal(t, string(typeb), "_doc")
assert.Equal(t, string(idb), "GZq-bnYBC53QmW9Kk2ve")

data = []byte("{\"index\":{\"_id\":\"GZq-bnYBC53QmW9Kk2ve\",\"_type\":\"_doc\",\"_index\":\"medcl1\"}}")

action, indexb, typeb, idb, _, _ = elastic.ParseActionMeta(data)

fmt.Println(string(action), string(indexb), string(idb))
assert.Equal(t, string(action), "index")
assert.Equal(t, string(indexb), "medcl1")
assert.Equal(t, string(typeb), "_doc")
assert.Equal(t, string(idb), "GZq-bnYBC53QmW9Kk2ve")

data = []byte("{\"index\":{\"_index\":\"test\",\"_type\":\"doc\"}}")
action, indexb, typeb, idb, _, _ = elastic.ParseActionMeta(data)

fmt.Println(string(action), string(indexb), string(idb))
assert.Equal(t, string(action), "index")
assert.Equal(t, string(indexb), "test")
assert.Equal(t, string(typeb), "doc")
assert.Equal(t, string(idb), "")

data = []byte("{\"delete\":{\"_index\":\"test\",\"_type\":\"_doc\"}}")
action, indexb, typeb, idb, _, _ = elastic.ParseActionMeta(data)

fmt.Println(string(action), string(indexb), string(idb))
assert.Equal(t, string(action), "delete")
assert.Equal(t, string(indexb), "test")
assert.Equal(t, string(typeb), "_doc")
assert.Equal(t, string(idb), "")

data = []byte("{\"create\":{\"_index\":\"test\",\"_type\":\"_doc\"}}")
action, indexb, typeb, idb, _, _ = elastic.ParseActionMeta(data)

fmt.Println(string(action), string(indexb), string(idb))
assert.Equal(t, string(action), "create")
assert.Equal(t, string(indexb), "test")
assert.Equal(t, string(typeb), "_doc")
assert.Equal(t, string(idb), "")

data = []byte("{ \"update\" : {\"_id\" : \"1\", \"_index\" : \"test\"} }")
action, indexb, typeb, idb, _, _ = elastic.ParseActionMeta(data)

fmt.Println(string(action), string(indexb), string(idb))
assert.Equal(t, string(action), "update")
assert.Equal(t, string(indexb), "test")
assert.Equal(t, string(typeb), "")
assert.Equal(t, string(idb), "1")

data = []byte("{ \"update\" : {\"_index\" : \"test\"} }")
action, indexb, typeb, idb, _, _ = elastic.ParseActionMeta(data)

fmt.Println(string(action), string(indexb), string(idb))
assert.Equal(t, string(action), "update")
assert.Equal(t, string(indexb), "test")
assert.Equal(t, string(typeb), "")
assert.Equal(t, string(idb), "")

}

func TestParseActionMeta3(t *testing.T) {

data := []byte("{\"index\":{\"_index\":\"medcl1\",\"_type\":\"_doc\",\"_id\":\"GZq-bnYBC53QmW9Kk2ve\"}}")
newData, err := updateJsonWithNewIndex("index", data, "newIndex", "newType", "newId")
fmt.Println(err, string(newData))
assert.Equal(t, string(newData), "{\"index\":{\"_index\":\"newIndex\",\"_type\":\"newType\",\"_id\":\"newId\"}}")

data = []byte("{\"index\":{\"_index\":\"medcl1\",\"_id\":\"GZq-bnYBC53QmW9Kk2ve\"}}")
newData, err = updateJsonWithNewIndex("index", data, "newIndex", "newType", "newId")
fmt.Println(err, string(newData))
assert.Equal(t, string(newData), "{\"index\":{\"_index\":\"newIndex\",\"_id\":\"newId\",\"_type\":\"newType\"}}")

data = []byte("{\"index\":{\"_index\":\"medcl1\",\"_type\":\"doc1\"}}")
newData, err = updateJsonWithNewIndex("index", data, "newIndex", "newType", "newId")
fmt.Println(err, string(newData))
assert.Equal(t, string(newData), "{\"index\":{\"_index\":\"newIndex\",\"_type\":\"newType\",\"_id\":\"newId\"}}")

data = []byte("{\"index\":{\"_index\":\"medcl1\",\"_type\":\"doc1\"}}")
newData, err = updateJsonWithNewIndex("index", data, "", "", "newId")
fmt.Println(err, string(newData))
assert.Equal(t, string(newData), "{\"index\":{\"_index\":\"medcl1\",\"_type\":\"doc1\",\"_id\":\"newId\"}}")
}

func TestGetUrlLevelMeta(t *testing.T) {

pathStr := "/index/_bulk"
Expand Down
1 change: 0 additions & 1 deletion proxy/filters/security/ldap/ldap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func TestLDAPFunctions(t *testing.T) {
UserFilter: "(uid=%s)",
}


r := &fasthttp.Request{}
r.SetBasicAuth("galieleo", "password")

Expand Down
11 changes: 1 addition & 10 deletions proxy/filters/throttle/bulk_request_throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,9 @@ func (this *ElasticsearchBulkRequestThrottle) Filter(ctx *fasthttp.RequestCtx) {
var indexOpStats = map[string]int{}
var indexPayloadStats = map[string]int{}

docCount, err := elastic.WalkBulkRequests(body, func(eachLine []byte) (skipNextLine bool) {
docCount, err := elastic.WalkBulkRequests(pathStr, body, func(eachLine []byte) (skipNextLine bool) {
return false
}, func(metaBytes []byte, actionStr, index, typeName, id, routing string, offset int) (err error) {
if index == "" {
//url level
var urlLevelIndex string
urlLevelIndex, _ = elastic.ParseUrlLevelBulkMeta(pathStr)
if urlLevelIndex != "" {
index = urlLevelIndex
}
}

//stats
v, ok := indexOpStats[index]
if !ok {
Expand Down
Loading