Skip to content

Commit

Permalink
chore: handle simplified bulk metdata, parse index from url path (#59)
Browse files Browse the repository at this point in the history
* chore: handle simplified bulk metdata, parse index from url path
* chore: cleanup comments
* update release notes
  • Loading branch information
medcl authored Jan 15, 2025
1 parent 8e4859d commit 68ff853
Show file tree
Hide file tree
Showing 7 changed files with 256 additions and 24 deletions.
50 changes: 30 additions & 20 deletions core/elastic/bulk_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
package elastic

import (
"fmt"
"github.com/buger/jsonparser"
"infini.sh/framework/core/errors"
)

var ActionIndex = "index"
Expand All @@ -40,35 +40,45 @@ var ActionUpdate = "update"
var ActionStart = []byte("\"")
var ActionEnd = []byte("\"")

var Actions = []string{"index", "delete", "create", "update"}
var Actions = []string{ActionIndex, ActionDelete, ActionCreate, ActionUpdate}

func ParseActionMeta(data []byte) (action, index, typeName, id, routing string, err error) {
// Extract the first key of the JSON object to determine the action
err = jsonparser.ObjectEach(data, func(key []byte, value []byte, dataType jsonparser.ValueType, offset int) error {
action = string(key) // The first key is the action

match := false
for _, v := range Actions {
jsonparser.ObjectEach(data, func(key []byte, value []byte, dataType jsonparser.ValueType, offset int) error {
switch string(key) {
// Parse the nested object for metadata fields
parseErr := jsonparser.ObjectEach(value, func(metaKey []byte, metaValue []byte, dataType jsonparser.ValueType, offset int) error {
switch string(metaKey) {
case "_index":
index = string(value)
break
index = string(metaValue)
case "_type":
typeName = string(value)
break
typeName = string(metaValue)
case "_id":
id = string(value)
break
id = string(metaValue)
case "_routing":
routing = string(value)
break
routing = string(metaValue)
}
match = true
return nil
}, v)
action = v
if match {
return action, index, typeName, id, routing, nil
})

if parseErr != nil {
err = fmt.Errorf("error parsing metadata: %w", parseErr)
}

// Stop further iteration after processing the first key
return fmt.Errorf("break")
})

// If no action was found, or there was a parsing error
if err != nil && err.Error() != "break" {
return "", "", "", "", "", fmt.Errorf("invalid_meta_buffer: %v", string(data))
}

// Clear the error if "break" was used to stop iteration
if err != nil && err.Error() == "break" {
err = nil
}

return action, index, typeName, id, routing, errors.Errorf("invalid_meta_buffer: %v", string(data))
return action, index, typeName, id, routing, err
}
150 changes: 150 additions & 0 deletions core/elastic/bulk_parser_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/* Copyright © INFINI LTD. All rights reserved.
* Web: https://infinilabs.com
* Email: hello#infini.ltd */

package elastic

import (
"fmt"
"github.com/buger/jsonparser"
"github.com/stretchr/testify/assert"
"infini.sh/framework/core/util"
"testing"
)

func TestParseActionMeta1(t *testing.T) {

data := []byte("{\"index\":{\"_index\":\"medcl1\",\"_type\":\"_doc\",\"_id\":\"GZq-bnYBC53QmW9Kk2ve\"}}")
action := util.ExtractFieldFromBytes(&data, ActionStart, ActionEnd, nil)
fmt.Println(string(action))
indexb, _, _, _ := jsonparser.Get(data, util.UnsafeBytesToString(action), "_index")
fmt.Println(string(indexb))
assert.Equal(t, string(action), "index")
assert.Equal(t, string(indexb), "medcl1")
idb, _, _, _ := jsonparser.Get(data, util.UnsafeBytesToString(action), "_id")
assert.Equal(t, string(idb), "GZq-bnYBC53QmW9Kk2ve")

//update json bytes
new, _ := jsonparser.Set(data, []byte("medcl2"), "index", "_index")
fmt.Println("new:", string(new))

}

func TestParseActionMeta2(t *testing.T) {

data := []byte("{\"index\":{\"_index\":\"medcl1\",\"_type\":\"_doc\",\"_id\":\"GZq-bnYBC53QmW9Kk2ve\"}}")

action, indexb, typeb, idb, _, _ := ParseActionMeta(data)
fmt.Println(string(action), string(indexb), string(idb))
assert.Equal(t, string(action), "index")
assert.Equal(t, string(indexb), "medcl1")
assert.Equal(t, string(typeb), "_doc")
assert.Equal(t, string(idb), "GZq-bnYBC53QmW9Kk2ve")

data = []byte("{\"index\":{\"_type\":\"_doc\",\"_id\":\"GZq-bnYBC53QmW9Kk2ve\",\"_index\":\"medcl1\"}}")

action, indexb, typeb, idb, _, _ = ParseActionMeta(data)

fmt.Println(string(action), string(indexb), string(idb))
assert.Equal(t, string(action), "index")
assert.Equal(t, string(indexb), "medcl1")
assert.Equal(t, string(typeb), "_doc")
assert.Equal(t, string(idb), "GZq-bnYBC53QmW9Kk2ve")

data = []byte("{\"index\":{\"_id\":\"GZq-bnYBC53QmW9Kk2ve\",\"_type\":\"_doc\",\"_index\":\"medcl1\"}}")

action, indexb, typeb, idb, _, _ = ParseActionMeta(data)

fmt.Println(string(action), string(indexb), string(idb))
assert.Equal(t, string(action), "index")
assert.Equal(t, string(indexb), "medcl1")
assert.Equal(t, string(typeb), "_doc")
assert.Equal(t, string(idb), "GZq-bnYBC53QmW9Kk2ve")

data = []byte("{\"index\":{\"_index\":\"test\",\"_type\":\"doc\"}}")
action, indexb, typeb, idb, _, _ = ParseActionMeta(data)

fmt.Println(string(action), string(indexb), string(idb))
assert.Equal(t, string(action), "index")
assert.Equal(t, string(indexb), "test")
assert.Equal(t, string(typeb), "doc")
assert.Equal(t, string(idb), "")

data = []byte("{\"delete\":{\"_index\":\"test\",\"_type\":\"_doc\"}}")
action, indexb, typeb, idb, _, _ = ParseActionMeta(data)

fmt.Println(string(action), string(indexb), string(idb))
assert.Equal(t, string(action), "delete")
assert.Equal(t, string(indexb), "test")
assert.Equal(t, string(typeb), "_doc")
assert.Equal(t, string(idb), "")

data = []byte("{\"create\":{\"_index\":\"test\",\"_type\":\"_doc\"}}")
action, indexb, typeb, idb, _, _ = ParseActionMeta(data)

fmt.Println(string(action), string(indexb), string(idb))
assert.Equal(t, string(action), "create")
assert.Equal(t, string(indexb), "test")
assert.Equal(t, string(typeb), "_doc")
assert.Equal(t, string(idb), "")

data = []byte("{ \"update\" : {\"_id\" : \"1\", \"_index\" : \"test\"} }")
action, indexb, typeb, idb, _, _ = ParseActionMeta(data)

fmt.Println(string(action), string(indexb), string(idb))
assert.Equal(t, string(action), "update")
assert.Equal(t, string(indexb), "test")
assert.Equal(t, string(typeb), "")
assert.Equal(t, string(idb), "1")

data = []byte("{ \"update\" : {\"_index\" : \"test\"} }")
action, indexb, typeb, idb, _, _ = ParseActionMeta(data)

fmt.Println(string(action), string(indexb), string(idb))
assert.Equal(t, string(action), "update")
assert.Equal(t, string(indexb), "test")
assert.Equal(t, string(typeb), "")
assert.Equal(t, string(idb), "")

data = []byte("{ \"update\" : {\"_index\" : \"test\"} }")
action, indexb, typeb, idb, _, _ = ParseActionMeta(data)

fmt.Println(string(action), string(indexb), string(idb))
assert.Equal(t, string(action), "update")
assert.Equal(t, string(indexb), "test")
assert.Equal(t, string(typeb), "")
assert.Equal(t, string(idb), "")

data = []byte("{ \"update\" : {} }")
action, indexb, typeb, idb, _, _ = ParseActionMeta(data)

fmt.Println(string(action), string(indexb), string(idb))
assert.Equal(t, string(action), "update")
assert.Equal(t, string(indexb), "")
assert.Equal(t, string(typeb), "")
assert.Equal(t, string(idb), "")

}

func TestParseActionMeta3(t *testing.T) {

data := []byte("{\"index\":{\"_index\":\"medcl1\",\"_type\":\"_doc\",\"_id\":\"GZq-bnYBC53QmW9Kk2ve\"}}")
newData, err := UpdateBulkMetadata("index", data, "newIndex", "newType", "newId")
fmt.Println(err, string(newData))
assert.Equal(t, string(newData), "{\"index\":{\"_index\":\"newIndex\",\"_type\":\"newType\",\"_id\":\"newId\"}}")

data = []byte("{\"index\":{\"_index\":\"medcl1\",\"_id\":\"GZq-bnYBC53QmW9Kk2ve\"}}")
newData, err = UpdateBulkMetadata("index", data, "newIndex", "newType", "newId")
fmt.Println(err, string(newData))
assert.Equal(t, string(newData), "{\"index\":{\"_index\":\"newIndex\",\"_id\":\"newId\",\"_type\":\"newType\"}}")

data = []byte("{\"index\":{\"_index\":\"medcl1\",\"_type\":\"doc1\"}}")
newData, err = UpdateBulkMetadata("index", data, "newIndex", "newType", "newId")
fmt.Println(err, string(newData))
assert.Equal(t, string(newData), "{\"index\":{\"_index\":\"newIndex\",\"_type\":\"newType\",\"_id\":\"newId\"}}")

data = []byte("{\"index\":{\"_index\":\"medcl1\",\"_type\":\"doc1\"}}")
newData, err = UpdateBulkMetadata("index", data, "", "", "newId")
fmt.Println(err, string(newData))
assert.Equal(t, string(newData), "{\"index\":{\"_index\":\"medcl1\",\"_type\":\"doc1\",\"_id\":\"newId\"}}")
}
72 changes: 70 additions & 2 deletions core/elastic/bulk_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ type BulkDetailItem struct {
Reasons []string `json:"reasons,omitempty"`
}

func WalkBulkRequests(data []byte, eachLineFunc func(eachLine []byte) (skipNextLine bool),
func WalkBulkRequests(pathStr string, data []byte, eachLineFunc func(eachLine []byte) (skipNextLine bool),
metaFunc func(metaBytes []byte, actionStr, index, typeName, id, routing string, offset int) (err error),
payloadFunc func(payloadBytes []byte, actionStr, index, typeName, id, routing string),
operationFunc func(actionStr, index, typeName, id, routing string)) (int, error) {
Expand All @@ -127,6 +127,11 @@ func WalkBulkRequests(data []byte, eachLineFunc func(eachLine []byte) (skipNextL
var id string
var routing string

var urlLevelIndex string
var urlLevelType string

urlLevelIndex, urlLevelType = ParseUrlLevelBulkMeta(pathStr)

for i, line := range lines {

bytesCount := len(line)
Expand Down Expand Up @@ -155,6 +160,7 @@ func WalkBulkRequests(data []byte, eachLineFunc func(eachLine []byte) (skipNextL
if global.Env().IsDebug {
log.Trace(docCount, ",", actionStr, index, typeName, id, routing, err, ",", string(line))
}

if err != nil {
if global.Env().IsDebug {
if util.ContainStr(err.Error(), "invalid_meta_buffer") {
Expand All @@ -167,6 +173,36 @@ func WalkBulkRequests(data []byte, eachLineFunc func(eachLine []byte) (skipNextL
panic(err)
}

//get the index/type from url's path
if index == "" {

var indexNew, typeNew, idNew string

//only handle empty index
if index == "" && urlLevelIndex != "" {
index = urlLevelIndex
indexNew = urlLevelIndex
} else {
panic("target index was missing")
}

if typeName == "" && urlLevelType != "" {
typeName = urlLevelType
typeNew = urlLevelType
}

if indexNew != "" || typeNew != "" || idNew != "" {
var err error
line, err = UpdateBulkMetadata(actionStr, line, indexNew, typeNew, idNew)
if err != nil {
panic(err)
}
if global.Env().IsDebug {
log.Trace("updated meta,", id, ",", line)
}
}
}

err = metaFunc(line, actionStr, index, typeName, id, routing, docCount)
if err != nil {
panic(err)
Expand Down Expand Up @@ -201,6 +237,37 @@ func WalkBulkRequests(data []byte, eachLineFunc func(eachLine []byte) (skipNextL
return docCount, nil
}

func UpdateBulkMetadata(action string, scannedByte []byte, index, typeName, id string) (newBytes []byte, err error) {

if global.Env().IsDebug {
log.Trace("update:", action, ",", index, ",", typeName, ",", id)
}

newBytes = make([]byte, len(scannedByte))
copy(newBytes, scannedByte)

if index != "" {
newBytes, err = jsonparser.Set(newBytes, []byte("\""+index+"\""), action, "_index")
if err != nil {
return newBytes, err
}
}
if typeName != "" {
newBytes, err = jsonparser.Set(newBytes, []byte("\""+typeName+"\""), action, "_type")
if err != nil {
return newBytes, err
}
}
if id != "" {
newBytes, err = jsonparser.Set(newBytes, []byte("\""+id+"\""), action, "_id")
if err != nil {
return newBytes, err
}
}

return newBytes, err
}

func ParseUrlLevelBulkMeta(pathStr string) (urlLevelIndex, urlLevelType string) {

if !util.SuffixStr(pathStr, "_bulk") {
Expand Down Expand Up @@ -671,6 +738,7 @@ func HandleBulkResponse(req *fasthttp.Request, resp *fasthttp.Response, tag util
nonRetryableItems.ResetData()
retryableItems.ResetData()
successItems.ResetData()
pathStr := util.UnsafeBytesToString(req.PhantomURI().Path())

containError := util.LimitedBytesSearch(resbody, []byte("\"errors\":true"), 64)
var statsCodeStats = map[int]int{}
Expand Down Expand Up @@ -755,7 +823,7 @@ func HandleBulkResponse(req *fasthttp.Request, resp *fasthttp.Response, tag util

var match = false
var retryable = false
WalkBulkRequests(requestBytes, func(eachLine []byte) (skipNextLine bool) {
WalkBulkRequests(pathStr, requestBytes, func(eachLine []byte) (skipNextLine bool) {
return false
}, func(metaBytes []byte, actionStr, index, typeName, id, routing string, offset int) (err error) {
if reqFailed {
Expand Down
2 changes: 1 addition & 1 deletion core/elastic/bulk_processors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestBulkWalkLines(t *testing.T) {
"{ \"create\" : { \"_index\" : \"test\", \"_id\" : \"3\" } }\n{ \"field1\" : \"value3\" }\n" +
"{ \"update\" : {\"_id\" : \"1\", \"_index\" : \"test\"} }\n{ \"doc\" : {\"field2\" : \"value2\"} }\n"

WalkBulkRequests([]byte(bulkRequests), func(eachLine []byte) (skipNextLine bool) {
WalkBulkRequests("", []byte(bulkRequests), func(eachLine []byte) (skipNextLine bool) {
//fmt.Println(string(eachLine))
return false
}, func(metaBytes []byte, actionStr, index, typeName, id, routing string, offset int) (err error) {
Expand Down
2 changes: 2 additions & 0 deletions core/elastic/domain_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,8 @@ func (metadata *ElasticsearchMetadata) GetIndexRoutingTable(index string) (map[s
}
}
}
}else{
//index was not found
}
return table.Shards, nil
}
Expand Down
2 changes: 2 additions & 0 deletions docs/content.en/docs/release-notes/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ Information about release notes of INFINI Framework is provided here.
### Bug fix
### Improvements
- Add util to http handler, support to parse bool parameter
- Handle simplified bulk metdata, parse index from url path #59


## v1.1.0 (2025-01-11)

Expand Down
2 changes: 1 addition & 1 deletion plugins/elastic/bulk_indexing/bulk_indexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@ READ_DOCS:
var totalOps, sliceOps int
var collectMeta = false
// document level slicing, check each document_id, slice by document hash
elastic.WalkBulkRequests(pop.Data, func(eachLine []byte) (skipNextLine bool) {
elastic.WalkBulkRequests("", pop.Data, func(eachLine []byte) (skipNextLine bool) {
return false
}, func(metaBytes []byte, actionStr, index, typeName, id, routing string, offset int) (err error) {
totalOps++
Expand Down

0 comments on commit 68ff853

Please sign in to comment.