Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LLM_EXTRACT_TEXT implementation #18435

Merged
merged 13 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ require (
github.com/itchyny/gojq v0.12.16
github.com/jhump/protoreflect v1.15.2
github.com/json-iterator/go v1.1.12
github.com/ledongthuc/pdf v0.0.0-20240201131950-da5b75280b06
github.com/lni/dragonboat/v4 v4.0.0-20220815145555-6f622e8bcbef
github.com/lni/goutils v1.3.1-0.20220604063047-388d67b4dbc4
github.com/lni/vfs v0.2.1-0.20220616104132-8852fd867376
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/labstack/echo/v4 v4.1.11/go.mod h1:i541M3Fj6f76NZtHSj7TXnyM8n2gaodfvfxNnFqi74g=
github.com/labstack/echo/v4 v4.5.0/go.mod h1:czIriw4a0C1dFun+ObrXp7ok03xON0N1awStJ6ArI7Y=
github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k=
github.com/ledongthuc/pdf v0.0.0-20240201131950-da5b75280b06 h1:kacRlPN7EN++tVpGUorNGPn/4DnB7/DfTY82AOn6ccU=
github.com/ledongthuc/pdf v0.0.0-20240201131950-da5b75280b06/go.mod h1:imJHygn/1yfhB7XSJJKlFZKl/J+dCPAknuiaGOshXAs=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
Expand Down
5 changes: 0 additions & 5 deletions pkg/container/types/datalink.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,6 @@ func ParseDatalink(fsPath string) (string, []int, string, error) {

// 2. get file extension
extension := filepath.Ext(u.Path)
switch extension {
case ".txt", ".csv":
default:
return "", nil, "", moerr.NewNYINoCtxf("unsupported file type %s", extension)
}

// 3. get size and offset from the query
urlParams := make(map[string]string)
Expand Down
178 changes: 178 additions & 0 deletions pkg/sql/plan/function/func_llm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package function

import (
"context"
"github.com/ledongthuc/pdf"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/util"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/vm/process"
"strings"
)

// LLMExtractText function
func LLMExtractText(parameters []*vector.Vector, result vector.FunctionResultWrapper, proc *process.Process, length int, selectList *FunctionSelectList) error {
input := vector.GenerateFunctionStrParameter(parameters[0])
output := vector.GenerateFunctionStrParameter(parameters[1])
extractorType := vector.GenerateFunctionStrParameter(parameters[2])
rs := vector.MustFunctionResult[bool](result)

rowCount := uint64(length)

for i := uint64(0); i < rowCount; i++ {
inputBytes, nullInput := input.GetStrValue(i)
if nullInput {
if err := rs.AppendMustNullForBytesResult(); err != nil {
return err
}
continue
}

outputBytes, nullInput2 := output.GetStrValue(i)
if nullInput2 {
if err := rs.AppendMustNullForBytesResult(); err != nil {
return err
}
continue
}

extractorTypeBytes, nullInput3 := extractorType.GetStrValue(i)
if nullInput3 {
if err := rs.AppendMustNullForBytesResult(); err != nil {
return err
}
continue
}

inputPath := util.UnsafeBytesToString(inputBytes)
outputPath := util.UnsafeBytesToString(outputBytes)
extractorTypeString := util.UnsafeBytesToString(extractorTypeBytes)

moUrl, _, ext, err := types.ParseDatalink(inputPath)
if err != nil {
return err
}
if "."+extractorTypeString != ext {
return moerr.NewInvalidInputNoCtxf("File type and extractor type are not equal.")
}
if ext != ".pdf" {
return moerr.NewInvalidInputNoCtxf("Only pdf file supported.")
}

outputPathUrl, _, _, err := types.ParseDatalink(outputPath)
if err != nil {
return err
}

success, err := extractTextFromPdfAndWriteToFile(moUrl, outputPathUrl, proc)
if err != nil {
return err
}

// return whether the process completes successfully
if success {
if err := rs.Append(true, false); err != nil {
return err
}
} else {
if err := rs.Append(false, false); err != nil {
return err
}
}

}

return nil
}

func extractTextFromPdfAndWriteToFile(pdfPath string, txtPath string, proc *process.Process) (bool, error) {
// read PDF to string
content, err := readPdfToString(pdfPath)
if err != nil {
return false, moerr.NewInvalidInputNoCtxf("Invalid PDF input.")
}

// file service and write file
ctx := context.TODO()
charleschile marked this conversation as resolved.
Show resolved Hide resolved
fs, readPath, err := fileservice.GetForETL(ctx, proc.Base.FileService, txtPath)

// delete the file if txt file exist because Write() only works when a file does not exist
_, err = fs.StatFile(ctx, readPath)
charleschile marked this conversation as resolved.
Show resolved Hide resolved
if err == nil {
err1 := fs.Delete(ctx, readPath)
if err1 != nil {
return false, moerr.NewInvalidInputNoCtxf("Cannot remove file %s", readPath)
}
}

_, err = fileservice.DoWithRetry(
"BackupWrite",
func() (int, error) {
return 0, fs.Write(ctx, fileservice.IOVector{
FilePath: readPath,
Entries: []fileservice.IOEntry{
{
Offset: 0,
Size: int64(len(content)),
Data: []byte(content),
},
},
})
},
64,
fileservice.IsRetryableError,
)
if err != nil {
return false, err
}
return true, nil
}

func isSameSentence(current, last pdf.Text) bool {
return strings.TrimSpace(current.S) != "" &&
last.Font == current.Font &&
last.FontSize == current.FontSize &&
last.X == current.X &&
last.Y == current.Y
}

func readPdfToString(path string) (string, error) {
f, r, err := pdf.Open(path)
charleschile marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return "", err
}
defer func() {
if f != nil {
f.Close()
}
}()

var textBuilder strings.Builder
totalPage := r.NumPage()

for pageIndex := 1; pageIndex <= totalPage; pageIndex++ {
p := r.Page(pageIndex)
if p.V.IsNull() {
continue
}
var lastTextStyle pdf.Text
texts := p.Content().Text
for _, text := range texts {
if isSameSentence(text, lastTextStyle) {
lastTextStyle.S += text.S
} else {
if lastTextStyle.S != "" {
textBuilder.WriteString(lastTextStyle.S)
}
lastTextStyle = text
}
}
if lastTextStyle.S != "" {
textBuilder.WriteString(lastTextStyle.S + " ")
}
}

return textBuilder.String(), nil
}
127 changes: 127 additions & 0 deletions pkg/sql/plan/function/func_llm_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package function

import (
"fmt"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/testutil"
"github.com/stretchr/testify/require"
"os"
"testing"
)

func TestLLMExtractText(t *testing.T) {
testCases := initLLMExtractTextCase()
wrongTestCases := initLLMExtractWrongTextCase()

proc := testutil.NewProcess()
for _, tc := range testCases {
fcTC := NewFunctionTestCase(proc, tc.inputs, tc.expect, LLMExtractText)
s, info := fcTC.Run()
require.True(t, s, fmt.Sprintf("case is '%s', err info is '%s'", tc.info, info))
}

for _, tc := range wrongTestCases {
fcTC := NewFunctionTestCase(proc, tc.inputs, tc.expect, LLMExtractText)
s, info := fcTC.Run()
require.True(t, s, fmt.Sprintf("case is '%s', err info is '%s'", tc.info, info))
}

}

func initLLMExtractTextCase() []tcTemp {
regularCases := []struct {
info string
input []string
output []string
extractorType []string
wants []bool
}{
{
info: "test encode - simple text",
input: []string{
fmt.Sprintf("file://%s/../../../../test/distributed/resources/llm_test/extract_text/MODocs1.pdf?offset=0&size=4", GetFilePath()),
fmt.Sprintf("file://%s/../../../../test/distributed/resources/llm_test/extract_text/example.pdf?offset=0&size=4", GetFilePath()),
},
output: []string{
fmt.Sprintf("file://%s/../../../../test/distributed/resources/llm_test/extract_text/MODocs1.txt", GetFilePath()),
fmt.Sprintf("file://%s/../../../../test/distributed/resources/llm_test/extract_text/example.txt", GetFilePath()),
},
extractorType: []string{
"pdf",
"pdf",
},
wants: []bool{
true,
true,
},
},
}

var testInputs = make([]tcTemp, 0, len(regularCases))
for _, c := range regularCases {
testInputs = append(testInputs, tcTemp{
info: c.info,
inputs: []FunctionTestInput{
NewFunctionTestInput(types.T_datalink.ToType(), c.input, []bool{}),
NewFunctionTestInput(types.T_datalink.ToType(), c.output, []bool{}),
NewFunctionTestInput(types.T_varchar.ToType(), c.extractorType, []bool{}),
},
expect: NewFunctionTestResult(types.T_bool.ToType(), false, c.wants, []bool{}),
})
}

return testInputs
}

func initLLMExtractWrongTextCase() []tcTemp {
regularCases := []struct {
info string
input []string
output []string
extractorType []string
wants []bool
}{
{
info: "test encode - simple text",
input: []string{
fmt.Sprintf("file://%s/../../../../test/distributed/resources/llm_test/extract_text/MODocs1.txt?offset=0&size=4", GetFilePath()),
"",
fmt.Sprintf("file://%s/../../../../test/distributed/resources/llm_test/extract_text/example.pdf?offset=0&size=4", GetFilePath()),
},
output: []string{
fmt.Sprintf("file://%s/../../../../test/distributed/resources/llm_test/extract_text/MODocs1.txt", GetFilePath()),
"",
fmt.Sprintf("file://%s/../../../../test/distributed/resources/llm_test/extract_text/example.txt", GetFilePath()),
},
extractorType: []string{
"pdf",
"",
"txt",
},
wants: []bool{
true,
true,
},
},
}

var testInputs = make([]tcTemp, 0, len(regularCases))
for _, c := range regularCases {
testInputs = append(testInputs, tcTemp{
info: c.info,
inputs: []FunctionTestInput{
NewFunctionTestInput(types.T_datalink.ToType(), c.input, []bool{}),
NewFunctionTestInput(types.T_datalink.ToType(), c.output, []bool{}),
NewFunctionTestInput(types.T_varchar.ToType(), c.extractorType, []bool{}),
},
expect: NewFunctionTestResult(types.T_bool.ToType(), true, c.wants, []bool{}),
})
}

return testInputs
}

func GetFilePath() string {
dir, _ := os.Getwd()
return dir
}
3 changes: 3 additions & 0 deletions pkg/sql/plan/function/function_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ const (
BITMAP_COUNT
BITMAP_CONSTRUCT_AGG
BITMAP_OR_AGG
LLM_EXTRACT_TEXT

// FUNCTION_END_NUMBER is not a function, just a flag to record the max number of function.
// TODO: every one should put the new function id in front of this one if you want to make a new function.
Expand Down Expand Up @@ -700,4 +701,6 @@ var functionIdRegister = map[string]int32{
"bitmap_count": BITMAP_COUNT,
"bitmap_construct_agg": BITMAP_CONSTRUCT_AGG,
"bitmap_or_agg": BITMAP_OR_AGG,

"llm_extract_text": LLM_EXTRACT_TEXT,
}
21 changes: 21 additions & 0 deletions pkg/sql/plan/function/list_builtIn.go
Original file line number Diff line number Diff line change
Expand Up @@ -6532,6 +6532,27 @@ var supportedOthersBuiltIns = []FuncNew{
},
},
},

// function `LLM_EXTRACT_TEXT`
{
functionId: LLM_EXTRACT_TEXT,
class: plan.Function_STRICT,
layout: STANDARD_FUNCTION,
checkFn: fixedTypeMatch,

Overloads: []overload{
{
overloadId: 0,
args: []types.T{types.T_datalink, types.T_datalink, types.T_varchar},
retType: func(parameters []types.Type) types.Type {
return types.T_bool.ToType()
},
newOp: func() executeLogicOfOverload {
return LLMExtractText
},
},
},
},
}

func MoCtl(ivecs []*vector.Vector, result vector.FunctionResultWrapper, proc *process.Process, length int, _ *FunctionSelectList) (err error) {
Expand Down
6 changes: 6 additions & 0 deletions test/distributed/cases/function/func_llm_extract_file.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
select llm_extract_text(cast('file://$resources/llm_test/extract_text/MODocs1.pdf?offset=0&size=4' as datalink), cast('file://$resources/llm_test/extract_text/MODocs1.txt' as datalink), "pdf");
llm_extract_text(cast(file:///Users/charles/Desktop/codes/matrixone/matrixone/test/distributed/resources/llm_test/extract_text/MODocs1.pdf?offset=0&size=4 as datalink), cast(file:///Users/charles/Desktop/codes/matrixone/matrixone/test/distributed/resources/llm_test/extract_text/MODocs1.txt as datalink), pdf)
true
select llm_extract_text(cast('file://$resources/llm_test/extract_text/example.pdf?offset=0&size=4' as datalink), cast('file://$resources/llm_test/extract_text/example.txt' as datalink), "pdf");
llm_extract_text(cast(file:///Users/charles/Desktop/codes/matrixone/matrixone/test/distributed/resources/llm_test/extract_text/example.pdf?offset=0&size=4 as datalink), cast(file:///Users/charles/Desktop/codes/matrixone/matrixone/test/distributed/resources/llm_test/extract_text/example.txt as datalink), pdf)
true
2 changes: 2 additions & 0 deletions test/distributed/cases/function/func_llm_extract_file.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
select llm_extract_text(cast('file://$resources/llm_test/extract_text/MODocs1.pdf?offset=0&size=4' as datalink), cast('file://$resources/llm_test/extract_text/MODocs1.txt' as datalink), "pdf");
XuPeng-SH marked this conversation as resolved.
Show resolved Hide resolved
select llm_extract_text(cast('file://$resources/llm_test/extract_text/example.pdf?offset=0&size=4' as datalink), cast('file://$resources/llm_test/extract_text/example.txt' as datalink), "pdf");
Binary file not shown.
Binary file not shown.
Loading