-
-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathK8sWatchClient.cs
92 lines (78 loc) · 3.75 KB
/
K8sWatchClient.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
using k8s;
using k8s.Autorest;
using k8s.Models;
namespace KCert.Services;
[Service]
public class K8sWatchClient(KCertConfig cfg, ILogger<K8sClient> log, Kubernetes client)
{
public const string CertRequestKey = "kcert.dev/cert-request";
public const string CertRequestValue = "request";
public const string IngressLabelKey = "kcert.dev/ingress";
public string IngressLabel => $"{IngressLabelKey}={cfg.IngressLabelValue}";
public string ConfigLabel => $"{CertRequestKey}={CertRequestValue}";
public delegate Task ChangeCallback<T>(WatchEventType type, T item);
public Task WatchIngressesAsync(ChangeCallback<V1Ingress> callback, CancellationToken tok)
{
return WatchInLoopAsync(callback, WatchAllIngressAsync, WatchNsIngressAsync, tok);
}
private Task<HttpOperationResponse<V1IngressList>> WatchAllIngressAsync(CancellationToken tok)
{
return client.NetworkingV1.ListIngressForAllNamespacesWithHttpMessagesAsync(watch: true, cancellationToken: tok, labelSelector: IngressLabel);
}
private Task<HttpOperationResponse<V1IngressList>> WatchNsIngressAsync(string ns, CancellationToken tok)
{
return client.NetworkingV1.ListNamespacedIngressWithHttpMessagesAsync(ns, watch: true, cancellationToken: tok, labelSelector: IngressLabel);
}
public Task WatchConfigMapsAsync(ChangeCallback<V1ConfigMap> callback, CancellationToken tok)
{
return WatchInLoopAsync(callback, WatchAllConfigMapsAsync, WatchNsConfigMapsAsync, tok);
}
private Task<HttpOperationResponse<V1ConfigMapList>> WatchAllConfigMapsAsync(CancellationToken tok)
{
return client.CoreV1.ListConfigMapForAllNamespacesWithHttpMessagesAsync(watch: true, cancellationToken: tok, labelSelector: ConfigLabel);
}
private Task<HttpOperationResponse<V1ConfigMapList>> WatchNsConfigMapsAsync(string ns, CancellationToken tok)
{
return client.CoreV1.ListNamespacedConfigMapWithHttpMessagesAsync(ns, watch: true, cancellationToken: tok, labelSelector: ConfigLabel);
}
delegate Task<HttpOperationResponse<L>> WatchAllFunc<L>(CancellationToken tok);
delegate Task<HttpOperationResponse<L>> WatchNsFunc<L>(string ns, CancellationToken tok);
private Task WatchInLoopAsync<L, T>(ChangeCallback<T> callback, WatchAllFunc<L> all, WatchNsFunc<L> ns, CancellationToken tok)
{
return cfg.NamespaceConstraints.Length == 0 ? WatchInLoopAsync(typeof(T).Name, callback, all, tok) : WatchInLoopAsync(callback, ns, tok);
}
private Task WatchInLoopAsync<L, T>(ChangeCallback<T> callback, WatchNsFunc<L> func, CancellationToken tok)
{
return Task.WhenAll(cfg.NamespaceConstraints
.Select(ns => WatchInLoopAsync($"{ns}:{typeof(T).Name}", callback, (t) => func(ns, t), tok))
.ToArray()
);
}
private async Task WatchInLoopAsync<L, T>(string id, ChangeCallback<T> callback, WatchAllFunc<L> watch, CancellationToken tok)
{
var typeName = typeof(T).Name;
while (true)
{
try
{
await foreach (var (type, item) in watch(tok).WatchAsync<T, L>())
{
await callback(type, item);
}
}
catch (HttpRequestException ex)
{
if (ex.Message == "Error while copying content to a stream.")
{
log.LogInformation("Empty Kubernetes client result threw an exception watching [{id}]. Trying again after 5 seconds.", id);
await Task.Delay(TimeSpan.FromSeconds(5), tok);
}
else
{
log.LogError("Unexpected error watching [{id}]", id);
throw;
}
}
}
}
}