Skip to content
tom10271 edited this page Apr 3, 2018 · 11 revisions

The ScrollService allows you to iterate through a large result set. The example below for elastic v3.0.50+ illustrates how to do that effectively as a pipeline, using multiple goroutines. We use the excellent golang.org/x/sync/errgroup for our solution.

package main

import (
	"encoding/json"
	"io"

	"github.com/olivere/elastic"
	"golang.org/x/net/context"
	"golang.org/x/sync/errgroup"
	"gopkg.in/cheggaaa/pb.v1"
)

type Product struct {
	SKU  string `json:"sku"`
	Name string `json:"name"`
}

func main() {
	client, err := elastic.NewClient()
	if err != nil {
		panic(err)
	}

	// Count total and setup progress
	total, err := client.Count("warehouse").Type("product").Do()
	if err != nil {
		panic(err)
	}
	bar := pb.StartNew(int(total))

	// This example illustrates how to use goroutines to iterate
	// through a result set via ScrollService.
	//
	// It uses the excellent golang.org/x/sync/errgroup package to do so.
	//
	// The first goroutine will Scroll through the result set and send
	// individual documents to a channel.
	//
	// The second cluster of goroutines will receive documents from the channel and
	// deserialize them.
	//
	// Feel free to add a third goroutine to do something with the
	// deserialized results.
	//
	// Let's go.

	// 1st goroutine sends individual hits to channel.
	resultsChannel := make(chan elastic.SearchResult)
	hitsChannel := make(chan json.RawMessage)
	g, ctx := errgroup.WithContext(context.Background())
	g.Go(func() error {
		defer close(hitsChannel)
		// Initialize scroller. Just don't call Do yet.
		scroll := client.Scroll("warehouse").Type("product").Size(100)
		for {
			results, err := scroll.Do()
			if err == io.EOF {
				return nil // all results retrieved
			}
			if err != nil {
				return err // something went wrong
			}

			// Send results to the result channel at once so that it won't block the next scroll request
			select {
			case resultsChannel <- *results:
			case <-ctx.Done():
				return ctx.Err()
			}
		}
		return nil
	})

	// Send hit once by once to hit channel
	g.Go(func() error {
		for results := range resultsChannel {
			for _, hit := range results.Hits.Hits {
				select {
				case hitsChannel <- *hit.Source:
				case <-ctx.Done():
					return ctx.Err()
				}
			}
		}

		return nil
	})

	// 2nd goroutine receives hits and deserializes them.
	//
	// If you want, setup a number of goroutines handling deserialization in parallel.
	for i := 0; i < 10; i++ {
		g.Go(func() error {
			for hit := range hitsChannel {
				// Deserialize
				var p Product
				err := json.Unmarshal(hit, &p)
				if err != nil {
					return err
				}

				// Do something with the product here, e.g. send it to another channel
				// for further processing.
				_ = p

				bar.Increment()

				// Terminate early?
				select {
				default:
				case <-ctx.Done():
					return ctx.Err()
				}
			}
			return nil
		})
	}

	// Check whether any goroutines failed.
	if err := g.Wait(); err != nil {
		panic(err)
	}

	// Done.
	bar.FinishPrint("Done")
}

Notice: You can sort the results with ScrollService. But that comes with a big performance hit. Elasticsearch has to sort results first before returning them to you. So if all you need is to get each result from a result set (potentially with a Query), do not add a sort order. See here for details.

Clone this wiki locally