Skip to content

Commit

Permalink
Feature rework reader ops (#10)
Browse files Browse the repository at this point in the history
* Add Close methods to Stub and StubGroup.
* Add CloseAllStubs to close []StubGroup.
* Remove closing of reader in parser.
* Update examples.
* Update docstrings to indicate nil Ticks mean EOF
  • Loading branch information
teamjorge authored Aug 31, 2024
2 parents ebfac11 + 315c19d commit 9dc8d03
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 35 deletions.
1 change: 1 addition & 0 deletions examples/loader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func main() {
// We group our stubs mainly to be able to identify the batches we are loading
// This might not be necessary on your use case
groups := stubs.Group()
defer ibt.CloseAllStubs(groups)

for groupNumber, group := range groups {
// Create a new processor for this group and set the groupNumber.
Expand Down
1 change: 1 addition & 0 deletions examples/track_temp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func main() {
// We group the stubs by iRacing session. This allows us to summarise results for
// an entire session, instead of just a single ibt file.
groups := stubs.Group()
defer ibt.CloseAllStubs(groups)

for groupIdx, group := range groups {
// Create the instance(s) of your processor(s) for this group
Expand Down
5 changes: 3 additions & 2 deletions parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func NewParser(reader headers.Reader, header *headers.Header, whitelist ...strin
// Next parses and returns the next tick of telemetry variables and whether it can be called again.
//
// A return of false will indicate that the buffer has reached the end. If the buffer has reached the end and Next() is called again,
// a nil and false will be returned.
// a nil and false will be returned. Additionally, a check can be done to check if the returned Tick is nil to determine if the EOF was reached.
//
// Should expected variable values be missing, please ensure that they are added to the Parser whitelist.
func (p *Parser) Next() (Tick, bool) {
Expand All @@ -64,6 +64,8 @@ func (p *Parser) Next() (Tick, bool) {
//
// ParseAt is useful if a specific offset is known. An example would be the
// telemetry variable buffers that are provided during live telemetry parsing.
//
// When nil is returned, the buffer has reached EOF.
func (p *Parser) ParseAt(offset int) Tick {
currentBuf := p.read(offset)
if currentBuf == nil {
Expand All @@ -80,7 +82,6 @@ func (p *Parser) read(start int) []byte {
buf := make([]byte, p.header.TelemetryHeader.BufLen)
_, err := p.reader.ReadAt(buf, int64(start))
if err != nil {
defer p.reader.Close()
return nil
}

Expand Down
9 changes: 1 addition & 8 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,11 @@ func Process(ctx context.Context, stubs StubGroup, processors ...Processor) erro
}

func process(ctx context.Context, stub Stub, processors ...Processor) error {
reader, err := stub.Open()
if err != nil {
return err
}

defer reader.Close()

header := stub.header

whitelist := buildWhitelist(header.VarHeader, processors...)

parser := NewParser(reader, header, whitelist...)
parser := NewParser(stub.r, header, whitelist...)
for {
select {
case <-ctx.Done():
Expand Down
14 changes: 1 addition & 13 deletions processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestProcess(t *testing.T) {
}

stubs := StubGroup{
{filepath: ".testing/valid_test_file.ibt", header: testHeaders},
{filepath: ".testing/valid_test_file.ibt", header: testHeaders, r: f},
}

t.Run("test Process() normal processor", func(t *testing.T) {
Expand Down Expand Up @@ -77,18 +77,6 @@ func TestProcess(t *testing.T) {
}
})

t.Run("test process() invalid file", func(t *testing.T) {
proc := testProcessor{whitelist: []string{"LapCurrentLapTime"}}

invalidStub := Stub{
filepath: "disappear_here",
}

if err := process(context.Background(), invalidStub, &proc); err == nil {
t.Errorf("expected Process() to exit with a file error")
}
})

t.Run("test process() invalid file", func(t *testing.T) {
proc := testProcessor{whitelist: []string{"LapCurrentLapTime"}}

Expand Down
50 changes: 44 additions & 6 deletions stub.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ibt

import (
"errors"
"fmt"
"os"
"sort"
Expand All @@ -16,18 +17,22 @@ import (
type Stub struct {
filepath string
header *headers.Header
r headers.Reader
}

// Open the underlying ibt file for reading
func (stub *Stub) Open() (headers.Reader, error) {
reader, err := os.Open(stub.Filename())
func (stub *Stub) Open() (err error) {
stub.r, err = os.Open(stub.Filename())
if err != nil {
return nil, fmt.Errorf("failed to open stub file %s for reading: %v", stub.Filename(), err)
return fmt.Errorf("failed to open stub file %s for reading: %v", stub.Filename(), err)
}

return reader, nil
return nil
}

// Close the stub reader
func (stub *Stub) Close() error { return stub.r.Close() }

// Filename where the stub originated from
func (stub *Stub) Filename() string { return stub.filepath }

Expand All @@ -54,13 +59,31 @@ func (stub *Stub) DriverIdx() int {
// This group is not necessarily part of the same session, but can be grouped with Group().
type StubGroup []Stub

// Close the reader for every stub in the group
func (sg StubGroup) Close() error {
errs := make([]error, 0)

for _, stub := range sg {
if err := stub.Close(); err != nil {
errs = append(errs, err)
}
}

if len(errs) > 0 {
return errors.Join(errs...)
}

return nil
}

// ParseStubs will create a stub for each of the given files by parsing their headers.
func ParseStubs(files ...string) (StubGroup, error) {
stubs := make(StubGroup, 0)

for _, file := range files {
stub, err := parseStub(file)
if err != nil {
stubs.Close()
return stubs, err
}

Expand All @@ -78,14 +101,13 @@ func parseStub(filename string) (Stub, error) {
if err != nil {
return stub, fmt.Errorf("failed to open file %s for reading: %v", filename, err)
}
defer f.Close()

header, err := headers.ParseHeaders(f)
if err != nil {
return stub, fmt.Errorf("failed to parse headers for file %s - %v", filename, err)
}

return Stub{filename, header}, nil
return Stub{filename, header, f}, nil
}

// Group stubs together by their iRacing session.
Expand Down Expand Up @@ -121,6 +143,22 @@ func (stubs StubGroup) Group() []StubGroup {
return groups
}

func CloseAllStubs(groups []StubGroup) error {
errs := make([]error, 0)

for _, group := range groups {
if err := group.Close(); err != nil {
errs = append(errs, err)
}
}

if len(errs) > 0 {
return errors.Join(errs...)
}

return nil
}

// groupTestSessionStubs ensures that ibt files from iRacing Test sessions are grouped correctly.
//
// The logic for grouping Test session files is slightly different due to the lack of subSessionIds
Expand Down
137 changes: 131 additions & 6 deletions stub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ func TestStubs(t *testing.T) {
t.Run("stubs Open() valid file", func(t *testing.T) {
stub := Stub{filepath: ".testing/valid_test_file.ibt"}

f, err := stub.Open()
if err != nil {
if err := stub.Open(); err != nil {
t.Errorf("did not expect an error when opening file %s. received: %v", ".testing/valid_test_file.ibt", err)
}

Expand All @@ -90,8 +89,7 @@ func TestStubs(t *testing.T) {
t.Run("stubs Open() invalid file", func(t *testing.T) {
stub := Stub{filepath: ".testing/disappear_here.ibt"}

_, err := stub.Open()
if err == nil {
if err := stub.Open(); err == nil {
t.Errorf("expected an error when opening a non-existent file %s", ".testing/disappear_here.ibt")
}
})
Expand Down Expand Up @@ -452,5 +450,132 @@ func TestStubGroupSorting(t *testing.T) {
})
}

// Order is not always preserved with the slices
// Check based on length
func TestStubGroupClose(t *testing.T) {
t.Run("test normal close", func(t *testing.T) {
f1, err := os.Open(".testing/empty_test_file.ibt")
if err != nil {
t.Errorf("failed to open test file %v", err)
}
defer f1.Close()
f2, err := os.Open(".testing/empty_test_file.ibt")
if err != nil {
t.Errorf("failed to open test file %v", err)
}
defer f2.Close()

stubGroup := StubGroup{
Stub{filepath: "5.ibt", r: f1},
Stub{filepath: "3.ibt", r: f2},
}

err = stubGroup.Close()
if err != nil {
t.Errorf("expected stub group to close without error. received: %v", err)
}

if err := stubGroup[0].r.Close(); err == nil {
t.Errorf("expected stub 0 to be closed")
}

if err := stubGroup[1].r.Close(); err == nil {
t.Errorf("expected stub 0 to be closed")
}
})

t.Run("test close with error", func(t *testing.T) {
f1, err := os.Open(".testing/empty_test_file.ibt")
if err != nil {
t.Errorf("failed to open test file %v", err)
}
defer f1.Close()
f2, err := os.Open(".testing/empty_test_file.ibt")
if err != nil {
t.Errorf("failed to open test file %v", err)
}
f2.Close()

stubGroup := StubGroup{
Stub{filepath: "5.ibt", r: f1},
Stub{filepath: "3.ibt", r: f2},
}

err = stubGroup.Close()
if err == nil {
t.Error("expected stub group to close with an error")
}

if err.Error() != "close .testing/empty_test_file.ibt: file already closed" {
t.Errorf("expected error message to be %s. received: %s",
"close .testing/empty_test_file.ibt: file already closed", err.Error())
}
})
}

func TestCloseAllStubs(t *testing.T) {
t.Run("test normal close", func(t *testing.T) {
f1, err := os.Open(".testing/empty_test_file.ibt")
if err != nil {
t.Errorf("failed to open test file %v", err)
}
defer f1.Close()
f2, err := os.Open(".testing/empty_test_file.ibt")
if err != nil {
t.Errorf("failed to open test file %v", err)
}
defer f2.Close()

stubGroups := []StubGroup{
{
Stub{filepath: "5.ibt", r: f1},
},
{
Stub{filepath: "3.ibt", r: f2},
},
}

err = CloseAllStubs(stubGroups)
if err != nil {
t.Errorf("expected stub group to close without error. received: %v", err)
}

if err := stubGroups[0][0].r.Close(); err == nil {
t.Errorf("expected stub 0 to be closed")
}

if err := stubGroups[1][0].r.Close(); err == nil {
t.Errorf("expected stub 0 to be closed")
}
})

t.Run("test close with error", func(t *testing.T) {
f1, err := os.Open(".testing/empty_test_file.ibt")
if err != nil {
t.Errorf("failed to open test file %v", err)
}
defer f1.Close()
f2, err := os.Open(".testing/empty_test_file.ibt")
if err != nil {
t.Errorf("failed to open test file %v", err)
}
f2.Close()

stubGroups := []StubGroup{
{
Stub{filepath: "5.ibt", r: f1},
},
{
Stub{filepath: "3.ibt", r: f2},
},
}

err = CloseAllStubs(stubGroups)
if err == nil {
t.Error("expected stub group to close with an error")
}

if err.Error() != "close .testing/empty_test_file.ibt: file already closed" {
t.Errorf("expected error message to be %s. received: %s",
"close .testing/empty_test_file.ibt: file already closed", err.Error())
}
})
}

0 comments on commit 9dc8d03

Please sign in to comment.