Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature rework reader ops #10

Merged
merged 2 commits into from
Aug 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/loader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func main() {
// We group our stubs mainly to be able to identify the batches we are loading
// This might not be necessary on your use case
groups := stubs.Group()
defer ibt.CloseAllStubs(groups)

for groupNumber, group := range groups {
// Create a new processor for this group and set the groupNumber.
Expand Down
1 change: 1 addition & 0 deletions examples/track_temp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func main() {
// We group the stubs by iRacing session. This allows us to summarise results for
// an entire session, instead of just a single ibt file.
groups := stubs.Group()
defer ibt.CloseAllStubs(groups)

for groupIdx, group := range groups {
// Create the instance(s) of your processor(s) for this group
Expand Down
5 changes: 3 additions & 2 deletions parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func NewParser(reader headers.Reader, header *headers.Header, whitelist ...strin
// Next parses and returns the next tick of telemetry variables and whether it can be called again.
//
// A return of false will indicate that the buffer has reached the end. If the buffer has reached the end and Next() is called again,
// a nil and false will be returned.
// a nil and false will be returned. Additionally, a check can be done to check if the returned Tick is nil to determine if the EOF was reached.
//
// Should expected variable values be missing, please ensure that they are added to the Parser whitelist.
func (p *Parser) Next() (Tick, bool) {
Expand All @@ -64,6 +64,8 @@ func (p *Parser) Next() (Tick, bool) {
//
// ParseAt is useful if a specific offset is known. An example would be the
// telemetry variable buffers that are provided during live telemetry parsing.
//
// When nil is returned, the buffer has reached EOF.
func (p *Parser) ParseAt(offset int) Tick {
currentBuf := p.read(offset)
if currentBuf == nil {
Expand All @@ -80,7 +82,6 @@ func (p *Parser) read(start int) []byte {
buf := make([]byte, p.header.TelemetryHeader.BufLen)
_, err := p.reader.ReadAt(buf, int64(start))
if err != nil {
defer p.reader.Close()
return nil
}

Expand Down
9 changes: 1 addition & 8 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,11 @@ func Process(ctx context.Context, stubs StubGroup, processors ...Processor) erro
}

func process(ctx context.Context, stub Stub, processors ...Processor) error {
reader, err := stub.Open()
if err != nil {
return err
}

defer reader.Close()

header := stub.header

whitelist := buildWhitelist(header.VarHeader, processors...)

parser := NewParser(reader, header, whitelist...)
parser := NewParser(stub.r, header, whitelist...)
for {
select {
case <-ctx.Done():
Expand Down
14 changes: 1 addition & 13 deletions processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestProcess(t *testing.T) {
}

stubs := StubGroup{
{filepath: ".testing/valid_test_file.ibt", header: testHeaders},
{filepath: ".testing/valid_test_file.ibt", header: testHeaders, r: f},
}

t.Run("test Process() normal processor", func(t *testing.T) {
Expand Down Expand Up @@ -77,18 +77,6 @@ func TestProcess(t *testing.T) {
}
})

t.Run("test process() invalid file", func(t *testing.T) {
proc := testProcessor{whitelist: []string{"LapCurrentLapTime"}}

invalidStub := Stub{
filepath: "disappear_here",
}

if err := process(context.Background(), invalidStub, &proc); err == nil {
t.Errorf("expected Process() to exit with a file error")
}
})

t.Run("test process() invalid file", func(t *testing.T) {
proc := testProcessor{whitelist: []string{"LapCurrentLapTime"}}

Expand Down
50 changes: 44 additions & 6 deletions stub.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ibt

import (
"errors"
"fmt"
"os"
"sort"
Expand All @@ -16,18 +17,22 @@ import (
type Stub struct {
filepath string
header *headers.Header
r headers.Reader
}

// Open the underlying ibt file for reading
func (stub *Stub) Open() (headers.Reader, error) {
reader, err := os.Open(stub.Filename())
func (stub *Stub) Open() (err error) {
stub.r, err = os.Open(stub.Filename())
if err != nil {
return nil, fmt.Errorf("failed to open stub file %s for reading: %v", stub.Filename(), err)
return fmt.Errorf("failed to open stub file %s for reading: %v", stub.Filename(), err)
}

return reader, nil
return nil
}

// Close the stub reader
func (stub *Stub) Close() error { return stub.r.Close() }

// Filename where the stub originated from
func (stub *Stub) Filename() string { return stub.filepath }

Expand All @@ -54,13 +59,31 @@ func (stub *Stub) DriverIdx() int {
// This group is not necessarily part of the same session, but can be grouped with Group().
type StubGroup []Stub

// Close the reader for every stub in the group
func (sg StubGroup) Close() error {
errs := make([]error, 0)

for _, stub := range sg {
if err := stub.Close(); err != nil {
errs = append(errs, err)
}
}

if len(errs) > 0 {
return errors.Join(errs...)
}

return nil
}

// ParseStubs will create a stub for each of the given files by parsing their headers.
func ParseStubs(files ...string) (StubGroup, error) {
stubs := make(StubGroup, 0)

for _, file := range files {
stub, err := parseStub(file)
if err != nil {
stubs.Close()
return stubs, err
}

Expand All @@ -78,14 +101,13 @@ func parseStub(filename string) (Stub, error) {
if err != nil {
return stub, fmt.Errorf("failed to open file %s for reading: %v", filename, err)
}
defer f.Close()

header, err := headers.ParseHeaders(f)
if err != nil {
return stub, fmt.Errorf("failed to parse headers for file %s - %v", filename, err)
}

return Stub{filename, header}, nil
return Stub{filename, header, f}, nil
}

// Group stubs together by their iRacing session.
Expand Down Expand Up @@ -121,6 +143,22 @@ func (stubs StubGroup) Group() []StubGroup {
return groups
}

func CloseAllStubs(groups []StubGroup) error {
errs := make([]error, 0)

for _, group := range groups {
if err := group.Close(); err != nil {
errs = append(errs, err)
}
}

if len(errs) > 0 {
return errors.Join(errs...)
}

return nil
}

// groupTestSessionStubs ensures that ibt files from iRacing Test sessions are grouped correctly.
//
// The logic for grouping Test session files is slightly different due to the lack of subSessionIds
Expand Down
137 changes: 131 additions & 6 deletions stub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ func TestStubs(t *testing.T) {
t.Run("stubs Open() valid file", func(t *testing.T) {
stub := Stub{filepath: ".testing/valid_test_file.ibt"}

f, err := stub.Open()
if err != nil {
if err := stub.Open(); err != nil {
t.Errorf("did not expect an error when opening file %s. received: %v", ".testing/valid_test_file.ibt", err)
}

Expand All @@ -90,8 +89,7 @@ func TestStubs(t *testing.T) {
t.Run("stubs Open() invalid file", func(t *testing.T) {
stub := Stub{filepath: ".testing/disappear_here.ibt"}

_, err := stub.Open()
if err == nil {
if err := stub.Open(); err == nil {
t.Errorf("expected an error when opening a non-existent file %s", ".testing/disappear_here.ibt")
}
})
Expand Down Expand Up @@ -452,5 +450,132 @@ func TestStubGroupSorting(t *testing.T) {
})
}

// Order is not always preserved with the slices
// Check based on length
func TestStubGroupClose(t *testing.T) {
t.Run("test normal close", func(t *testing.T) {
f1, err := os.Open(".testing/empty_test_file.ibt")
if err != nil {
t.Errorf("failed to open test file %v", err)
}
defer f1.Close()
f2, err := os.Open(".testing/empty_test_file.ibt")
if err != nil {
t.Errorf("failed to open test file %v", err)
}
defer f2.Close()

stubGroup := StubGroup{
Stub{filepath: "5.ibt", r: f1},
Stub{filepath: "3.ibt", r: f2},
}

err = stubGroup.Close()
if err != nil {
t.Errorf("expected stub group to close without error. received: %v", err)
}

if err := stubGroup[0].r.Close(); err == nil {
t.Errorf("expected stub 0 to be closed")
}

if err := stubGroup[1].r.Close(); err == nil {
t.Errorf("expected stub 0 to be closed")
}
})

t.Run("test close with error", func(t *testing.T) {
f1, err := os.Open(".testing/empty_test_file.ibt")
if err != nil {
t.Errorf("failed to open test file %v", err)
}
defer f1.Close()
f2, err := os.Open(".testing/empty_test_file.ibt")
if err != nil {
t.Errorf("failed to open test file %v", err)
}
f2.Close()

stubGroup := StubGroup{
Stub{filepath: "5.ibt", r: f1},
Stub{filepath: "3.ibt", r: f2},
}

err = stubGroup.Close()
if err == nil {
t.Error("expected stub group to close with an error")
}

if err.Error() != "close .testing/empty_test_file.ibt: file already closed" {
t.Errorf("expected error message to be %s. received: %s",
"close .testing/empty_test_file.ibt: file already closed", err.Error())
}
})
}

func TestCloseAllStubs(t *testing.T) {
t.Run("test normal close", func(t *testing.T) {
f1, err := os.Open(".testing/empty_test_file.ibt")
if err != nil {
t.Errorf("failed to open test file %v", err)
}
defer f1.Close()
f2, err := os.Open(".testing/empty_test_file.ibt")
if err != nil {
t.Errorf("failed to open test file %v", err)
}
defer f2.Close()

stubGroups := []StubGroup{
{
Stub{filepath: "5.ibt", r: f1},
},
{
Stub{filepath: "3.ibt", r: f2},
},
}

err = CloseAllStubs(stubGroups)
if err != nil {
t.Errorf("expected stub group to close without error. received: %v", err)
}

if err := stubGroups[0][0].r.Close(); err == nil {
t.Errorf("expected stub 0 to be closed")
}

if err := stubGroups[1][0].r.Close(); err == nil {
t.Errorf("expected stub 0 to be closed")
}
})

t.Run("test close with error", func(t *testing.T) {
f1, err := os.Open(".testing/empty_test_file.ibt")
if err != nil {
t.Errorf("failed to open test file %v", err)
}
defer f1.Close()
f2, err := os.Open(".testing/empty_test_file.ibt")
if err != nil {
t.Errorf("failed to open test file %v", err)
}
f2.Close()

stubGroups := []StubGroup{
{
Stub{filepath: "5.ibt", r: f1},
},
{
Stub{filepath: "3.ibt", r: f2},
},
}

err = CloseAllStubs(stubGroups)
if err == nil {
t.Error("expected stub group to close with an error")
}

if err.Error() != "close .testing/empty_test_file.ibt: file already closed" {
t.Errorf("expected error message to be %s. received: %s",
"close .testing/empty_test_file.ibt: file already closed", err.Error())
}
})
}
Loading