-
Notifications
You must be signed in to change notification settings - Fork 1.2k
ScrollParallel
Oliver Eilhard edited this page Sep 10, 2016
·
11 revisions
The ScrollService
allows you to iterate through a large result set. The example below for elastic v3.0.50+ illustrates how to do that effectively as a pipeline, using multiple goroutines. We use the excellent golang.org/x/sync/errgroup
for our solution.
package main
import (
"encoding/json"
"io"
"golang.org/x/net/context"
"golang.org/x/sync/errgroup"
"gopkg.in/cheggaaa/pb.v1"
"gopkg.in/olivere/elastic.v3"
)
type Product struct {
SKU string `json:"sku"`
Name string `json:"name"`
}
func main() {
client, err := elastic.NewClient()
if err != nil {
panic(err)
}
// Count total and setup progress
total, err := client.Count("warehouse").Type("product").Do()
if err != nil {
panic(err)
}
bar := pb.StartNew(int(total))
// This example illustrates how to use goroutines to iterate
// through a result set via ScrollService.
//
// It uses the excellent golang.org/x/sync/errgroup package to do so.
//
// The first goroutine will Scroll through the result set and send
// individual documents to a channel.
//
// The second cluster of goroutines will receive documents from the channel and
// deserialize them.
//
// Feel free to add a third goroutine to do something with the
// deserialized results.
//
// Let's go.
// 1st goroutine sends individual hits to channel.
hits := make(chan json.RawMessage)
g, ctx := errgroup.WithContext(context.Background())
g.Go(func() error {
defer close(hits)
// Initialize scroller. Just don't call Do yet.
scroll := client.Scroll("warehouse").Type("product").Size(100)
for {
results, err := scroll.Do()
if err == io.EOF {
return nil // all results retrieved
}
if err != nil {
return err // something went wrong
}
// Send the hits to the hits channel
for _, hit := range results.Hits.Hits {
hits <- *hit.Source
}
// Check if we need to terminate early
select {
default:
case <-ctx.Done():
return ctx.Err()
}
}
})
// 2nd goroutine receives hits and deserializes them.
//
// If you want, setup a number of goroutines handling deserialization in parallel.
for i := 0; i < 10; i++ {
g.Go(func() error {
for hit := range hits {
// Deserialize
var p Product
err := json.Unmarshal(hit, &p)
if err != nil {
return err
}
// Do something with the product here, e.g. send it to another channel
// for further processing.
_ = p
bar.Increment()
// Terminate early?
select {
default:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})
}
// Check whether any goroutines failed.
if err := g.Wait(); err != nil {
panic(err)
}
// Done.
bar.FinishPrint("Done")
}
Notice: You can sort the results with ScrollService
. But that comes with a big performance hit. Elasticsearch has to sort results first before returning them to you. So if all you need is to get each result from a result set (potentially with a Query
), do not add a sort order. See here for details.