diff --git a/docs/content.en/docs/references/filters/elasticsearch.md b/docs/content.en/docs/references/filters/elasticsearch.md index a7ffb959..9b5689b6 100644 --- a/docs/content.en/docs/references/filters/elasticsearch.md +++ b/docs/content.en/docs/references/filters/elasticsearch.md @@ -139,6 +139,12 @@ flow: | read_buffer_size | int | Read cache size for an Elasticsearch request. The default value is `4096*4`. | | write_buffer_size | int | Write cache size for an Elasticsearch request. The default value is `4096*4`. | | tls_insecure_skip_verify | bool | Whether to ignore TLS certificate verification of an Elasticsearch cluster. The default value is `true`. | +| max_retry_times | int | The maximum number of retry attempts for requests. The default value is `5`. | +| retry_on_backend_failure | bool | Whether to retry requests when backend failures occur. Used to switch to another available host. The default value is `true`. | +| retry_readonly_on_backend_failure| bool | Whether to retry readonly requests (e.g., `GET`/`HEAD`) on backend failure. This is generally safe as it does not risk data duplication or corruption. The default value is `true`. | +| retry_writes_on_backend_failure | bool | Whether to retry write operations (e.g., `POST`/`PUT`/`PATCH`) on backend failure. Use with caution, as retries can lead to duplicate writes. Recommended to use with additional filters. The default value is `false`. | +| retry_on_backend_busy | bool | Whether to retry requests when the backend is busy with status code `429`. This helps handle temporary overloads or throttling. The default value is `false`. | +| retry_delay_in_ms | int | The delay in milliseconds between retry attempts. Does not apply when switching hosts. The default value is `1000`. | | balancer | string | Load balancing algorithm of a back-end Elasticsearch node. Currently, only the `weight` weight-based algorithm is available. | | skip_metadata_enrich | bool | Whether to skip the processing of Elasticsearch metadata and not add `X-*` metadata to the header of the request and response | | refresh.enable | bool | Whether to enable automatic refresh of node status changes, to perceive changes in the back-end Elasticsearch topology | diff --git a/docs/content.en/docs/release-notes/_index.md b/docs/content.en/docs/release-notes/_index.md index 7018405f..238941e3 100644 --- a/docs/content.en/docs/release-notes/_index.md +++ b/docs/content.en/docs/release-notes/_index.md @@ -13,6 +13,8 @@ Information about release notes of INFINI Gateway is provided here. ### Features - Support simple bulk metadata in bulk related filters (#59) +- Seamlessly retry requests on backend failure for the Elasticsearch filter (#63) + ### Bug fix diff --git a/proxy/output/elastic/config.go b/proxy/output/elastic/config.go index da9a5513..c445f8fa 100755 --- a/proxy/output/elastic/config.go +++ b/proxy/output/elastic/config.go @@ -29,12 +29,14 @@ type ProxyConfig struct { Elasticsearch string `config:"elasticsearch"` Balancer string `config:"balancer"` - MaxConnection int `config:"max_connection_per_node"` - MaxResponseBodySize int `config:"max_response_size"` - MaxRetryTimes int `config:"max_retry_times"` - RetryOnBackendFailure bool `config:"retry_on_backend_failure"` - RetryOnBackendBusy bool `config:"retry_on_backend_busy"` - RetryDelayInMs int `config:"retry_delay_in_ms"` + MaxConnection int `config:"max_connection_per_node"` + MaxResponseBodySize int `config:"max_response_size"` + MaxRetryTimes int `config:"max_retry_times"` + RetryOnBackendFailure bool `config:"retry_on_backend_failure"` + RetryReadonlyOnlyOnBackendFailure bool `config:"retry_readonly_on_backend_failure"` //usually it is safety to retry readonly requests, GET/HEAD verbs only, as write may have partial failure, retry may cause duplicated writes + RetryWriteOpsOnBackendFailure bool `config:"retry_writes_on_backend_failure"` //POST/PUT/PATCH requests, which means may not good for retry, but you can sill opt it on, and it is preferred to work with other flow/filters + RetryOnBackendBusy bool `config:"retry_on_backend_busy"` + RetryDelayInMs int `config:"retry_delay_in_ms"` MaxConnWaitTimeout time.Duration `config:"max_conn_wait_timeout"` MaxIdleConnDuration time.Duration `config:"max_idle_conn_duration"` diff --git a/proxy/output/elastic/elasticsearch.go b/proxy/output/elastic/elasticsearch.go index 5de23619..60baed04 100644 --- a/proxy/output/elastic/elasticsearch.go +++ b/proxy/output/elastic/elasticsearch.go @@ -94,8 +94,10 @@ func New(c *config.Config) (pipeline.Filter, error) { Balancer: "weight", MaxResponseBodySize: 100 * 1024 * 1024, MaxConnection: 5000, - MaxRetryTimes: 0, + MaxRetryTimes: 5, RetryDelayInMs: 1000, + RetryReadonlyOnlyOnBackendFailure: true, + RetryOnBackendFailure: true, TLSInsecureSkipVerify: true, ReadBufferSize: 4096 * 4, WriteBufferSize: 4096 * 4, diff --git a/proxy/output/elastic/reverseproxy.go b/proxy/output/elastic/reverseproxy.go index 54ae8b4a..cdf2f42e 100644 --- a/proxy/output/elastic/reverseproxy.go +++ b/proxy/output/elastic/reverseproxy.go @@ -27,6 +27,7 @@ import ( "context" "crypto/tls" "fmt" + "github.com/emirpasic/gods/sets/hashset" "infini.sh/framework/core/errors" "math/rand" "net" @@ -346,7 +347,7 @@ func (p *ReverseProxy) getHostClient() (clientAvailable bool, client *fasthttp.H } if len(p.hostClients) == 0 || len(p.endpoints) == 0 { - log.Error("no upstream found") + log.Error("no upstream was found") return false, nil, "" } @@ -462,36 +463,6 @@ func (p *ReverseProxy) DelegateRequest(elasticsearch string, metadata *elastic.E cleanHopHeaders(&myctx.Request) } - var pc fasthttp.ClientAPI - var host string - - if p.proxyConfig.FixedClient { - pc = p.client - host = p.host - } else { - //var ok bool - //使用算法来获取合适的 client - switch metadata.Config.ClientMode { - case "client": - _, pc, host = p.getClient() - break - case "host": - _, pc, host = p.getHostClient() - break - default: - _, pc, host = p.getClient() - } - - if !p.proxyConfig.SkipAvailableCheck && !elastic.IsHostAvailable(host) { - old := host - host = metadata.GetActiveHost() - if rate.GetRateLimiterPerSecond("proxy-host-not-available", old, 1).Allow() { - log.Infof("host [%v] is not available, fallback: [%v]", old, host) - } - pc = metadata.GetHttpClient(host) - } - } - // modify schema,align with elasticsearch's schema originalHost := string(myctx.Request.Header.Host()) originalSchema := myctx.Request.GetSchema() @@ -519,18 +490,71 @@ func (p *ReverseProxy) DelegateRequest(elasticsearch string, metadata *elastic.E myctx.Request.Header.Add(fasthttp.HeaderXForwardedHost, originalHost) } + curHost := string(myctx.Request.Host()) + + //choose a server to perform the request + var pc fasthttp.ClientAPI + var host string + + var retryChooseHost = true + var skippedHost *hashset.Set + retry := 0 +START: + + if retryChooseHost { + retryChooseHost = false + + oldHost := host + if p.proxyConfig.FixedClient { + pc = p.client + host = p.host + } else { + //var ok bool + switch metadata.Config.ClientMode { + case "client": + _, pc, host = p.getClient() + break + case "host": + _, pc, host = p.getHostClient() + break + default: + _, pc, host = p.getClient() + } + + if !p.proxyConfig.SkipAvailableCheck && !elastic.IsHostAvailable(host) { + old := host + host = metadata.GetActiveHost() + if rate.GetRateLimiterPerSecond("proxy-host-not-available", old, 1).Allow() { + log.Infof("host [%v] is not available, fallback: [%v]", old, host) + } + pc = metadata.GetHttpClient(host) + } + } + + //it may select the same host, we need ensure skip the failure node + if oldHost != "" && oldHost == host { + if skippedHost != nil { + if len(p.endpoints) > 1 { + for _, v := range p.endpoints { + if !skippedHost.Contains(v) { //the node is not in failure set, try use it + host = v + pc = metadata.GetHttpClient(host) + log.Trace("re-choose new host:", host) + } + } + } + } + } + } + if global.Env().IsDebug { log.Tracef("send request [%v] to upstream [%v]", myctx.Request.PhantomURI().String(), host) } - curHost := string(myctx.Request.Host()) if host != curHost || host != originalHost { myctx.Request.SetHostBytes([]byte(host)) } - retry := 0 -START: - metadata.CheckNodeTrafficThrottle(host, 1, myctx.Request.GetRequestLength(), 0) var err error @@ -557,6 +581,38 @@ START: elastic.GetOrInitHost(host, metadata.Config.ID).ReportFailure() } //server failure flow + + if p.proxyConfig.RetryOnBackendFailure { + if global.Env().IsDebug { + log.Trace("retry on background failure") + } + if skippedHost == nil { + skippedHost = hashset.New() + } + skippedHost.Add(host) + method := string(myctx.Method()) + //writes + if method == fasthttp.MethodPost || method == fasthttp.MethodPut || method == fasthttp.MethodPatch || method == fasthttp.MethodDelete { + + if global.Env().IsDebug { + log.Trace("retry writes on background failure") + } + + if p.proxyConfig.RetryWriteOpsOnBackendFailure { + retryAble = true + retryChooseHost = true + } + + } else if p.proxyConfig.RetryReadonlyOnlyOnBackendFailure { + if global.Env().IsDebug { + log.Trace("retry reads on background failure") + } + //reads + retryAble = true + retryChooseHost = true + } + } + } else if res.StatusCode() == 429 { if p.proxyConfig.RetryOnBackendBusy { retryAble = true @@ -566,7 +622,7 @@ START: if retryAble { retry++ if p.proxyConfig.MaxRetryTimes > 0 && retry < p.proxyConfig.MaxRetryTimes { - if p.proxyConfig.RetryDelayInMs > 0 { + if p.proxyConfig.RetryDelayInMs > 0 && !retryChooseHost { // No delay is required when switching hosts. time.Sleep(time.Duration(p.proxyConfig.RetryDelayInMs) * time.Millisecond) } myctx.Request.Header.Add("RETRY_AT", time.Now().String())