Skip to content

Commit

Permalink
feat: support simple bulk metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
medcl committed Jan 13, 2025
1 parent cdac72c commit 0776429
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 78 deletions.
33 changes: 20 additions & 13 deletions proxy/filters/elastic/bulk_request_mutate.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,29 +66,28 @@ func (this *ElasticsearchBulkRequestMutate) Filter(ctx *fasthttp.RequestCtx) {
var bulkBuff *bytebufferpool.ByteBuffer = bytebufferpool.Get("bulk_mutate_request_docs")
defer bytebufferpool.Put("bulk_mutate_request_docs", bulkBuff)
var metaCollected bool
docCount, err := elastic.WalkBulkRequests(body, func(eachLine []byte) (skipNextLine bool) {
docCount, err := elastic.WalkBulkRequests(pathStr, body, func(eachLine []byte) (skipNextLine bool) {
return false
}, func(metaBytes []byte, actionStr, index, typeName, id, routing string, offset int) (err error) {
metaCollected = false

metaStr := util.UnsafeBytesToString(metaBytes)

//url level
var urlLevelIndex string
var urlLevelType string

urlLevelIndex, urlLevelType = elastic.ParseUrlLevelBulkMeta(pathStr)
//var urlLevelIndex string
//var urlLevelType string
//urlLevelIndex, urlLevelType = elastic.ParseUrlLevelBulkMeta(pathStr)

var indexNew, typeNew, idNew string
if index == "" && urlLevelIndex != "" {
index = urlLevelIndex
indexNew = urlLevelIndex
}
//if index == "" && urlLevelIndex != "" {
// index = urlLevelIndex
// indexNew = urlLevelIndex
//}
//if typeName != typeNew && typeName == "" && !this.RemoveTypeMeta && urlLevelType != "" {
// typeName = urlLevelType
// typeNew = urlLevelType
//}

if typeName != typeNew && typeName == "" && !this.RemoveTypeMeta && urlLevelType != "" {
typeName = urlLevelType
typeNew = urlLevelType
}
if (actionStr == elastic.ActionIndex || actionStr == elastic.ActionCreate) && (len(id) == 0 || id == "null") && this.FixNilID {
randID := util.GetUUID()
if this.AddTimestampToID {
Expand All @@ -110,6 +109,13 @@ func (this *ElasticsearchBulkRequestMutate) Filter(ctx *fasthttp.RequestCtx) {
}
}

//index should not be empty, or will be use the default index
if index == "" {
index = this.DefaultIndex
idNew = this.DefaultIndex
}

//handle the index rename
if index != "" && len(this.IndexNameRename) > 0 {
v, ok := this.IndexNameRename[index]
if ok {
Expand All @@ -124,6 +130,7 @@ func (this *ElasticsearchBulkRequestMutate) Filter(ctx *fasthttp.RequestCtx) {
}
}

//handle the type rename
if typeName != "" && !this.RemoveTypeMeta && len(this.TypeNameRename) > 0 {
v, ok := this.TypeNameRename[typeName]
if ok && v != typeName {
Expand Down
2 changes: 1 addition & 1 deletion proxy/filters/elastic/bulk_request_resort.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func (filter *BulkRequestResort) Filter(ctx *fasthttp.RequestCtx) {

docs := map[int][]elastic.VersionInfo{}

elastic.WalkBulkRequests(requestBody, nil,
elastic.WalkBulkRequests(pathStr, requestBody, nil,
func(metaBytes []byte, actionStr, index, typeName, id, routing string, docCount int) (err error) {
if lastRecord, collect = docsToReplicate[offset]; collect {
lastRecord.Payload = append(lastRecord.Payload, bytes.Copy(metaBytes))
Expand Down
78 changes: 24 additions & 54 deletions proxy/filters/elastic/bulk_reshuffle.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (this *BulkReshuffle) Filter(ctx *fasthttp.RequestCtx) {

var hitMetadataNotFound bool

docCount, err := elastic.WalkBulkRequests(body, func(eachLine []byte) (skipNextLine bool) {
docCount, err := elastic.WalkBulkRequests(pathStr, body, func(eachLine []byte) (skipNextLine bool) {
if validEachLine {
obj := map[string]interface{}{}
err := util.FromJSONBytes(eachLine, &obj)
Expand All @@ -214,26 +214,26 @@ func (this *BulkReshuffle) Filter(ctx *fasthttp.RequestCtx) {

shardID := 0

var indexNew, typeNew, idNew string

//only handle empty index
if index == "" {
//url level
var urlLevelIndex string
var urlLevelType string

urlLevelIndex, urlLevelType = elastic.ParseUrlLevelBulkMeta(pathStr)

if index == "" && urlLevelIndex != "" {
index = urlLevelIndex
indexNew = urlLevelIndex
}

if typeName == "" && urlLevelType != "" {
typeName = urlLevelType
typeNew = urlLevelType
}
}
var idNew string
//var indexNew, typeNew, idNew string
////only handle empty index
//if index == "" {
// //url levelbulk_processor.go:135
// var urlLevelIndex string
// var urlLevelType string
//
// urlLevelIndex, urlLevelType = elastic.ParseUrlLevelBulkMeta(pathStr)
//
// if index == "" && urlLevelIndex != "" {
// index = urlLevelIndex
// indexNew = urlLevelIndex
// }
//
// if typeName == "" && urlLevelType != "" {
// typeName = urlLevelType
// typeNew = urlLevelType
// }
//}

if (actionStr == elastic.ActionIndex || actionStr == elastic.ActionCreate) && (len(id) == 0 || id == "null") && fixNullID {
id = util.GetUUID()
Expand All @@ -243,9 +243,10 @@ func (this *BulkReshuffle) Filter(ctx *fasthttp.RequestCtx) {
}
}

if indexNew != "" || typeNew != "" || idNew != "" {
//if indexNew != "" || typeNew != "" || idNew != "" {
if idNew != "" {
var err error
metaBytes, err = updateJsonWithNewIndex(actionStr, metaBytes, indexNew, typeNew, idNew)
metaBytes, err = elastic.UpdateBulkMetadata(actionStr, metaBytes, "", "", idNew)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -630,34 +631,3 @@ func batchUpdateJson(scannedByte []byte, action string, set, del map[string]stri

return scannedByte, err
}

func updateJsonWithNewIndex(action string, scannedByte []byte, index, typeName, id string) (newBytes []byte, err error) {

if global.Env().IsDebug {
log.Trace("update:", action, ",", index, ",", typeName, ",", id)
}

newBytes = make([]byte, len(scannedByte))
copy(newBytes, scannedByte)

if index != "" {
newBytes, err = jsonparser.Set(newBytes, []byte("\""+index+"\""), action, "_index")
if err != nil {
return newBytes, err
}
}
if typeName != "" {
newBytes, err = jsonparser.Set(newBytes, []byte("\""+typeName+"\""), action, "_type")
if err != nil {
return newBytes, err
}
}
if id != "" {
newBytes, err = jsonparser.Set(newBytes, []byte("\""+id+"\""), action, "_id")
if err != nil {
return newBytes, err
}
}

return newBytes, err
}
1 change: 0 additions & 1 deletion proxy/filters/security/ldap/ldap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func TestLDAPFunctions(t *testing.T) {
UserFilter: "(uid=%s)",
}


r := &fasthttp.Request{}
r.SetBasicAuth("galieleo", "password")

Expand Down
18 changes: 9 additions & 9 deletions proxy/filters/throttle/bulk_request_throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,17 @@ func (this *ElasticsearchBulkRequestThrottle) Filter(ctx *fasthttp.RequestCtx) {
var indexOpStats = map[string]int{}
var indexPayloadStats = map[string]int{}

docCount, err := elastic.WalkBulkRequests(body, func(eachLine []byte) (skipNextLine bool) {
docCount, err := elastic.WalkBulkRequests(pathStr, body, func(eachLine []byte) (skipNextLine bool) {
return false
}, func(metaBytes []byte, actionStr, index, typeName, id, routing string, offset int) (err error) {
if index == "" {
//url level
var urlLevelIndex string
urlLevelIndex, _ = elastic.ParseUrlLevelBulkMeta(pathStr)
if urlLevelIndex != "" {
index = urlLevelIndex
}
}
//if index == "" {
// //url level
// var urlLevelIndex string
// urlLevelIndex, _ = elastic.ParseUrlLevelBulkMeta(pathStr)
// if urlLevelIndex != "" {
// index = urlLevelIndex
// }
//}

//stats
v, ok := indexOpStats[index]
Expand Down

0 comments on commit 0776429

Please sign in to comment.