-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
5 changed files
with
149 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
# loader | ||
|
||
## Overview | ||
|
||
The `loader` example shows a pattern for loading telemetry data to an external destination. This could be loading it into a database, posting to an API, or even storing it in a file. | ||
|
||
The practices in this example should not be copied verbatim, but should show how `ibt` can be used to achieve this. | ||
|
||
Goals of the example: | ||
|
||
* Parse the `ibt` files into groups | ||
* For each group of `ibts` process each tick of telemetry data | ||
* When the threshold of processed telemetry ticks have been reached, perform a bulk load to the storage client | ||
* Add a group number to each telemetry tick to ensure they are easily filtered in the external storage layer | ||
* Store the number of batches loaded and print it after processing each group | ||
|
||
## Running | ||
|
||
```shell | ||
go run examples/track_temp/*.go | ||
|
||
# Or with your own files | ||
|
||
go run examples/track_temp/*.go /path/to/telem/files/*.ibt | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"log" | ||
|
||
"github.com/teamjorge/ibt" | ||
"github.com/teamjorge/ibt/examples" | ||
) | ||
|
||
func main() { | ||
// Parse the files into stubs | ||
stubs, err := examples.ParseExampleStubs() | ||
if err != nil { | ||
log.Fatal(err) | ||
} | ||
|
||
// Create our storage client | ||
storage := newStorage() | ||
if err := storage.Connect(); err != nil { | ||
log.Fatal(err) | ||
} | ||
// Close it when the application ends | ||
defer storage.Close() | ||
|
||
// We group our stubs mainly to be able to identify the batches we are loading | ||
// This might not be necessary on your use case | ||
groups := stubs.Group() | ||
|
||
for groupNumber, group := range groups { | ||
// Create a new processor for this group and set the groupNumber. | ||
// It embeds our storage and we set our loading threshold to 100 | ||
processor := newLoaderProcessor(storage, groupNumber, 100) | ||
|
||
// Process the group | ||
if err := ibt.Process(context.Background(), group, processor); err != nil { | ||
log.Fatalf("failed to process telemetry for stubs %v: %v", stubs, err) | ||
} | ||
|
||
// Print the number of batches loaded after each group | ||
log.Printf("%d batches loaded after group %d\n", storage.Loaded(), groupNumber) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
package main | ||
|
||
// This is a mock external storage client. | ||
// | ||
// Think of it as a database, API, or external file | ||
type storage struct { | ||
batchesLoaded int | ||
} | ||
|
||
// Simple constructor | ||
func newStorage() *storage { return new(storage) } | ||
|
||
func (s *storage) Connect() error { return nil } | ||
|
||
func (s *storage) Exec(data []map[string]interface{}) error { | ||
s.batchesLoaded += len(data) | ||
return nil | ||
} | ||
|
||
func (s *storage) Close() error { return nil } | ||
|
||
func (s *storage) Loaded() int { return s.batchesLoaded } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
package main | ||
|
||
import ( | ||
"fmt" | ||
|
||
"github.com/teamjorge/ibt" | ||
"github.com/teamjorge/ibt/headers" | ||
) | ||
|
||
type loaderProcessor struct { | ||
// Our storage client | ||
*storage | ||
// Cache for holding the number of telemetry ticks equal to threshold | ||
cache []map[string]interface{} | ||
// Number | ||
groupNumber int | ||
threshold int | ||
} | ||
|
||
// Simple Constructor for creating our processor | ||
func newLoaderProcessor(storage *storage, groupNumber int, threshold int) *loaderProcessor { | ||
return &loaderProcessor{storage, make([]map[string]interface{}, 0), groupNumber, threshold} | ||
} | ||
|
||
// Columns we want to parse from telemetry | ||
func (l *loaderProcessor) Whitelist() []string { | ||
return []string{ | ||
"Lap", "ThrottleRaw", "BrakeRaw", "ClutchRaw", "LapDistPct", "Lat", "Lon", | ||
} | ||
} | ||
|
||
// Our method for processing a single tick of telemetry. | ||
func (l *loaderProcessor) Process(input ibt.Tick, hasNext bool, session *headers.Session) error { | ||
// Add our group number to the tick of telemetry. | ||
// This will be useful to seperate ticks by group in our storage. | ||
input["groupNum"] = l.groupNumber | ||
|
||
// Add it to the cache | ||
l.cache = append(l.cache, input) | ||
|
||
// If our cache is past the threshold, that means we can now do a bulk load | ||
// to our storage. | ||
if len(l.cache) >= l.threshold { | ||
if err := l.loadBatch(); err != nil { | ||
return fmt.Errorf("failed to load batch - %v", err) | ||
} | ||
// Empty the cache again | ||
l.cache = make([]map[string]interface{}, 0) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (l *loaderProcessor) loadBatch() error { | ||
// Bulk load our batch to storage. | ||
return l.Exec(l.cache) | ||
} |