Skip to content

Commit

Permalink
Refactor CertChange logic to use semaphore
Browse files Browse the repository at this point in the history
  • Loading branch information
nabsul committed Nov 13, 2022
1 parent 68de7ac commit 195be72
Showing 1 changed file with 43 additions and 35 deletions.
78 changes: 43 additions & 35 deletions Services/CertChangeService.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
Expand All @@ -14,8 +15,9 @@ public class CertChangeService
private readonly K8sClient _k8s;
private readonly KCertClient _kcert;

private volatile Task _runningTask = Task.CompletedTask;
private volatile Task _nextTask = null;
private DateTime _lastRun = DateTime.MinValue;

private SemaphoreSlim _sem = new(1, 1);

public CertChangeService(ILogger<CertChangeService> log, K8sClient k8s, KCertClient kcert)
{
Expand All @@ -31,55 +33,61 @@ public CertChangeService(ILogger<CertChangeService> log, K8sClient k8s, KCertCli
// - However, it will not queue up multiple checks
public void RunCheck()
{
_log.LogInformation("Preparing for RunCheck");
lock(this)
{
if (_nextTask != null)
{
_log.LogInformation("Check already queued. Nothing to do.");
return;
}

_log.LogInformation("Queueing up a check for changes.");
_nextTask = CheckForChangesAsync(_runningTask);
}
_ = CheckForChangesAsync();
}

private async Task CheckForChangesAsync(Task previous)
private async Task CheckForChangesAsync()
{
await previous;
_log.LogInformation("Starting check for changes.");

lock(this)
var start = DateTime.UtcNow;
_log.LogInformation("Waiting for semaphore");

await _sem.WaitAsync();
if (_lastRun > start)
{
_runningTask = _nextTask;
_nextTask = null;
_log.LogInformation("No need to run this check");
_sem.Release();
return;
}

// fetch all ingresses to figure out which certs need have which hosts
var nsLookup = new Dictionary<(string Namespace, string Name), HashSet<string>>();
await foreach (var (ns, name, hosts) in MergeAsync(GetIngressCertsAsync(), GetConfigMapCertsAsync()))
_lastRun = start;
_log.LogInformation("Starting check for changes.");

try
{
var key = (ns, name);
if (!nsLookup.TryGetValue(key, out var currHosts))
// fetch all ingresses to figure out which certs need have which hosts
var nsLookup = new Dictionary<(string Namespace, string Name), HashSet<string>>();
await foreach (var (ns, name, hosts) in MergeAsync(GetIngressCertsAsync(), GetConfigMapCertsAsync()))
{
currHosts = new HashSet<string>();
nsLookup.Add(key, currHosts);
var key = (ns, name);
if (!nsLookup.TryGetValue(key, out var currHosts))
{
currHosts = new HashSet<string>();
nsLookup.Add(key, currHosts);
}

foreach (var h in hosts)
{
currHosts.Add(h);
}
}

foreach (var h in hosts)
foreach (var ((ns, name), hosts) in nsLookup)
{
currHosts.Add(h);
_log.LogInformation("Handling cert {ns} - {name} hosts: {h}", ns, name, string.Join(",", hosts));
await _kcert.RenewIfNeededAsync(ns, name, hosts.ToArray(), CancellationToken.None);
}
}

foreach (var ((ns, name), hosts) in nsLookup)
_log.LogInformation("Check for changes completed.");
}
catch(Exception ex)
{
_log.LogError("Failed to check for cert changes.", ex);
}
finally
{
_log.LogInformation("Handling cert {ns} - {name} hosts: {h}", ns, name, string.Join(",", hosts));
await _kcert.RenewIfNeededAsync(ns, name, hosts.ToArray(), CancellationToken.None);
_sem.Release();
}

_log.LogInformation("Check for changes completed.");
}

private static async IAsyncEnumerable<T> MergeAsync<T>(params IAsyncEnumerable<T>[] enumerators)
Expand Down

0 comments on commit 195be72

Please sign in to comment.