Skip to content

Commit

Permalink
feat: seamlessly retry requests on backend failure for the Elasticsea…
Browse files Browse the repository at this point in the history
…rch filter (#63)

* feat: allow to seamless retry on backend failure

* chore: wording

* chore: update release notes

* chore: update logging message
  • Loading branch information
medcl authored Jan 15, 2025
1 parent 32d495e commit 8cd9c85
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 43 deletions.
6 changes: 6 additions & 0 deletions docs/content.en/docs/references/filters/elasticsearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@ flow:
| read_buffer_size | int | Read cache size for an Elasticsearch request. The default value is `4096*4`. |
| write_buffer_size | int | Write cache size for an Elasticsearch request. The default value is `4096*4`. |
| tls_insecure_skip_verify | bool | Whether to ignore TLS certificate verification of an Elasticsearch cluster. The default value is `true`. |
| max_retry_times | int | The maximum number of retry attempts for requests. The default value is `5`. |
| retry_on_backend_failure | bool | Whether to retry requests when backend failures occur. Used to switch to another available host. The default value is `true`. |
| retry_readonly_on_backend_failure| bool | Whether to retry readonly requests (e.g., `GET`/`HEAD`) on backend failure. This is generally safe as it does not risk data duplication or corruption. The default value is `true`. |
| retry_writes_on_backend_failure | bool | Whether to retry write operations (e.g., `POST`/`PUT`/`PATCH`) on backend failure. Use with caution, as retries can lead to duplicate writes. Recommended to use with additional filters. The default value is `false`. |
| retry_on_backend_busy | bool | Whether to retry requests when the backend is busy with status code `429`. This helps handle temporary overloads or throttling. The default value is `false`. |
| retry_delay_in_ms | int | The delay in milliseconds between retry attempts. Does not apply when switching hosts. The default value is `1000`. |
| balancer | string | Load balancing algorithm of a back-end Elasticsearch node. Currently, only the `weight` weight-based algorithm is available. |
| skip_metadata_enrich | bool | Whether to skip the processing of Elasticsearch metadata and not add `X-*` metadata to the header of the request and response |
| refresh.enable | bool | Whether to enable automatic refresh of node status changes, to perceive changes in the back-end Elasticsearch topology |
Expand Down
2 changes: 2 additions & 0 deletions docs/content.en/docs/release-notes/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ Information about release notes of INFINI Gateway is provided here.

### Features
- Support simple bulk metadata in bulk related filters (#59)
- Seamlessly retry requests on backend failure for the Elasticsearch filter (#63)


### Bug fix

Expand Down
14 changes: 8 additions & 6 deletions proxy/output/elastic/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ type ProxyConfig struct {
Elasticsearch string `config:"elasticsearch"`
Balancer string `config:"balancer"`

MaxConnection int `config:"max_connection_per_node"`
MaxResponseBodySize int `config:"max_response_size"`
MaxRetryTimes int `config:"max_retry_times"`
RetryOnBackendFailure bool `config:"retry_on_backend_failure"`
RetryOnBackendBusy bool `config:"retry_on_backend_busy"`
RetryDelayInMs int `config:"retry_delay_in_ms"`
MaxConnection int `config:"max_connection_per_node"`
MaxResponseBodySize int `config:"max_response_size"`
MaxRetryTimes int `config:"max_retry_times"`
RetryOnBackendFailure bool `config:"retry_on_backend_failure"`
RetryReadonlyOnlyOnBackendFailure bool `config:"retry_readonly_on_backend_failure"` //usually it is safety to retry readonly requests, GET/HEAD verbs only, as write may have partial failure, retry may cause duplicated writes
RetryWriteOpsOnBackendFailure bool `config:"retry_writes_on_backend_failure"` //POST/PUT/PATCH requests, which means may not good for retry, but you can sill opt it on, and it is preferred to work with other flow/filters
RetryOnBackendBusy bool `config:"retry_on_backend_busy"`
RetryDelayInMs int `config:"retry_delay_in_ms"`

MaxConnWaitTimeout time.Duration `config:"max_conn_wait_timeout"`
MaxIdleConnDuration time.Duration `config:"max_idle_conn_duration"`
Expand Down
4 changes: 3 additions & 1 deletion proxy/output/elastic/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,10 @@ func New(c *config.Config) (pipeline.Filter, error) {
Balancer: "weight",
MaxResponseBodySize: 100 * 1024 * 1024,
MaxConnection: 5000,
MaxRetryTimes: 0,
MaxRetryTimes: 5,
RetryDelayInMs: 1000,
RetryReadonlyOnlyOnBackendFailure: true,
RetryOnBackendFailure: true,
TLSInsecureSkipVerify: true,
ReadBufferSize: 4096 * 4,
WriteBufferSize: 4096 * 4,
Expand Down
128 changes: 92 additions & 36 deletions proxy/output/elastic/reverseproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"context"
"crypto/tls"
"fmt"
"github.com/emirpasic/gods/sets/hashset"
"infini.sh/framework/core/errors"
"math/rand"
"net"
Expand Down Expand Up @@ -346,7 +347,7 @@ func (p *ReverseProxy) getHostClient() (clientAvailable bool, client *fasthttp.H
}

if len(p.hostClients) == 0 || len(p.endpoints) == 0 {
log.Error("no upstream found")
log.Error("no upstream was found")
return false, nil, ""
}

Expand Down Expand Up @@ -462,36 +463,6 @@ func (p *ReverseProxy) DelegateRequest(elasticsearch string, metadata *elastic.E
cleanHopHeaders(&myctx.Request)
}

var pc fasthttp.ClientAPI
var host string

if p.proxyConfig.FixedClient {
pc = p.client
host = p.host
} else {
//var ok bool
//使用算法来获取合适的 client
switch metadata.Config.ClientMode {
case "client":
_, pc, host = p.getClient()
break
case "host":
_, pc, host = p.getHostClient()
break
default:
_, pc, host = p.getClient()
}

if !p.proxyConfig.SkipAvailableCheck && !elastic.IsHostAvailable(host) {
old := host
host = metadata.GetActiveHost()
if rate.GetRateLimiterPerSecond("proxy-host-not-available", old, 1).Allow() {
log.Infof("host [%v] is not available, fallback: [%v]", old, host)
}
pc = metadata.GetHttpClient(host)
}
}

// modify schema,align with elasticsearch's schema
originalHost := string(myctx.Request.Header.Host())
originalSchema := myctx.Request.GetSchema()
Expand Down Expand Up @@ -519,18 +490,71 @@ func (p *ReverseProxy) DelegateRequest(elasticsearch string, metadata *elastic.E
myctx.Request.Header.Add(fasthttp.HeaderXForwardedHost, originalHost)
}

curHost := string(myctx.Request.Host())

//choose a server to perform the request
var pc fasthttp.ClientAPI
var host string

var retryChooseHost = true
var skippedHost *hashset.Set
retry := 0
START:

if retryChooseHost {
retryChooseHost = false

oldHost := host
if p.proxyConfig.FixedClient {
pc = p.client
host = p.host
} else {
//var ok bool
switch metadata.Config.ClientMode {
case "client":
_, pc, host = p.getClient()
break
case "host":
_, pc, host = p.getHostClient()
break
default:
_, pc, host = p.getClient()
}

if !p.proxyConfig.SkipAvailableCheck && !elastic.IsHostAvailable(host) {
old := host
host = metadata.GetActiveHost()
if rate.GetRateLimiterPerSecond("proxy-host-not-available", old, 1).Allow() {
log.Infof("host [%v] is not available, fallback: [%v]", old, host)
}
pc = metadata.GetHttpClient(host)
}
}

//it may select the same host, we need ensure skip the failure node
if oldHost != "" && oldHost == host {
if skippedHost != nil {
if len(p.endpoints) > 1 {
for _, v := range p.endpoints {
if !skippedHost.Contains(v) { //the node is not in failure set, try use it
host = v
pc = metadata.GetHttpClient(host)
log.Trace("re-choose new host:", host)
}
}
}
}
}
}

if global.Env().IsDebug {
log.Tracef("send request [%v] to upstream [%v]", myctx.Request.PhantomURI().String(), host)
}

curHost := string(myctx.Request.Host())
if host != curHost || host != originalHost {
myctx.Request.SetHostBytes([]byte(host))
}

retry := 0
START:

metadata.CheckNodeTrafficThrottle(host, 1, myctx.Request.GetRequestLength(), 0)

var err error
Expand All @@ -557,6 +581,38 @@ START:
elastic.GetOrInitHost(host, metadata.Config.ID).ReportFailure()
}
//server failure flow

if p.proxyConfig.RetryOnBackendFailure {
if global.Env().IsDebug {
log.Trace("retry on background failure")
}
if skippedHost == nil {
skippedHost = hashset.New()
}
skippedHost.Add(host)
method := string(myctx.Method())
//writes
if method == fasthttp.MethodPost || method == fasthttp.MethodPut || method == fasthttp.MethodPatch || method == fasthttp.MethodDelete {

if global.Env().IsDebug {
log.Trace("retry writes on background failure")
}

if p.proxyConfig.RetryWriteOpsOnBackendFailure {
retryAble = true
retryChooseHost = true
}

} else if p.proxyConfig.RetryReadonlyOnlyOnBackendFailure {
if global.Env().IsDebug {
log.Trace("retry reads on background failure")
}
//reads
retryAble = true
retryChooseHost = true
}
}

} else if res.StatusCode() == 429 {
if p.proxyConfig.RetryOnBackendBusy {
retryAble = true
Expand All @@ -566,7 +622,7 @@ START:
if retryAble {
retry++
if p.proxyConfig.MaxRetryTimes > 0 && retry < p.proxyConfig.MaxRetryTimes {
if p.proxyConfig.RetryDelayInMs > 0 {
if p.proxyConfig.RetryDelayInMs > 0 && !retryChooseHost { // No delay is required when switching hosts.
time.Sleep(time.Duration(p.proxyConfig.RetryDelayInMs) * time.Millisecond)
}
myctx.Request.Header.Add("RETRY_AT", time.Now().String())
Expand Down

0 comments on commit 8cd9c85

Please sign in to comment.