From a5627a4f416f6633b121657b3da5c6202f4f0eb0 Mon Sep 17 00:00:00 2001 From: Piotr Icikowski Date: Fri, 21 Jul 2023 23:12:18 +0200 Subject: [PATCH] feat(pkg): add initial source code --- commons.go | 44 ++++++++++++++++++++++ commons_test.go | 45 +++++++++++++++++++++++ go.mod | 3 ++ go.sum | 0 probes.go | 62 +++++++++++++++++++++++++++++++ probes_test.go | 83 ++++++++++++++++++++++++++++++++++++++++++ stateful_probe.go | 52 ++++++++++++++++++++++++++ stateful_probe_test.go | 42 +++++++++++++++++++++ 8 files changed, 331 insertions(+) create mode 100644 commons.go create mode 100644 commons_test.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 probes.go create mode 100644 probes_test.go create mode 100644 stateful_probe.go create mode 100644 stateful_probe_test.go diff --git a/commons.go b/commons.go new file mode 100644 index 0000000..ef0b136 --- /dev/null +++ b/commons.go @@ -0,0 +1,44 @@ +package kubeprobes + +import "sync" + +// ProbeFunction is a function that determines whether +// the given metric may be marked as correctly functioning. +// It not, the error should be returned. +type ProbeFunction func() error + +type statusQuery struct { + allGreen bool + mux sync.Mutex + wg sync.WaitGroup +} + +func (sq *statusQuery) isAllGreen() bool { + sq.wg.Wait() + sq.mux.Lock() + defer sq.mux.Unlock() + return sq.allGreen +} + +func newStatusQuery(probes []ProbeFunction) *statusQuery { + sq := &statusQuery{ + allGreen: true, + mux: sync.Mutex{}, + wg: sync.WaitGroup{}, + } + + sq.wg.Add(len(probes)) + for _, probe := range probes { + probe := probe + go func() { + defer sq.wg.Done() + if err := probe(); err != nil { + sq.mux.Lock() + sq.allGreen = false + sq.mux.Unlock() + } + }() + } + + return sq +} diff --git a/commons_test.go b/commons_test.go new file mode 100644 index 0000000..f93b3f1 --- /dev/null +++ b/commons_test.go @@ -0,0 +1,45 @@ +package kubeprobes + +import ( + "errors" + "testing" + "time" +) + +func TestStatusQueryIsAllGreen(t *testing.T) { + tests := map[string]struct { + probes []ProbeFunction + expectedStatus bool + }{ + "all green": { + probes: []ProbeFunction{ + func() error { return nil }, + func() error { time.Sleep(2 * time.Second); return nil }, + }, + expectedStatus: true, + }, + "some failed": { + probes: []ProbeFunction{ + func() error { return nil }, + func() error { time.Sleep(2 * time.Second); return errors.New("failed") }, + }, + expectedStatus: false, + }, + "all failed": { + probes: []ProbeFunction{ + func() error { return errors.New("failed") }, + func() error { time.Sleep(2 * time.Second); return errors.New("failed") }, + }, + expectedStatus: false, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sq := newStatusQuery(test.probes) + if sq.isAllGreen() != test.expectedStatus { + t.Errorf("expected status %v, got %v", test.expectedStatus, sq.isAllGreen()) + } + }) + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..a4ebc33 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module pkg.icikowski.pl/kubeprobes + +go 1.20 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..e69de29 diff --git a/probes.go b/probes.go new file mode 100644 index 0000000..0343c89 --- /dev/null +++ b/probes.go @@ -0,0 +1,62 @@ +package kubeprobes + +import ( + "net/http" +) + +type kubeprobes struct { + livenessProbes []ProbeFunction + readinessProbes []ProbeFunction +} + +// ServeHTTP implements http.Handler interface +func (kp *kubeprobes) ServeHTTP(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/live": + sq := newStatusQuery(kp.livenessProbes) + if sq.isAllGreen() { + w.WriteHeader(http.StatusOK) + } else { + w.WriteHeader(http.StatusServiceUnavailable) + } + case "/ready": + sq := newStatusQuery(append(kp.livenessProbes, kp.readinessProbes...)) + if sq.isAllGreen() { + w.WriteHeader(http.StatusOK) + } else { + w.WriteHeader(http.StatusServiceUnavailable) + } + default: + w.WriteHeader(http.StatusNotFound) + } +} + +type option func(*kubeprobes) + +// New returns a new instance of a Kubernetes probes +func New(options ...option) *kubeprobes { + kp := &kubeprobes{ + livenessProbes: []ProbeFunction{}, + readinessProbes: []ProbeFunction{}, + } + + for _, option := range options { + option(kp) + } + + return kp +} + +// WithLivenessProbes adds given liveness probes to the set of probes +func WithLivenessProbes(probes ...ProbeFunction) option { + return func(kp *kubeprobes) { + kp.livenessProbes = append(kp.livenessProbes, probes...) + } +} + +// WithReadinessProbes adds given readiness probes to the set of probes +func WithReadinessProbes(probes ...ProbeFunction) option { + return func(kp *kubeprobes) { + kp.readinessProbes = append(kp.readinessProbes, probes...) + } +} diff --git a/probes_test.go b/probes_test.go new file mode 100644 index 0000000..9139480 --- /dev/null +++ b/probes_test.go @@ -0,0 +1,83 @@ +package kubeprobes + +import ( + "net/http" + "net/http/httptest" + "testing" +) + +func getStatusFromEndpoint(t *testing.T, client *http.Client, endpoint string) int { + t.Helper() + resp, err := client.Get(endpoint) + if err != nil { + t.Errorf("error getting status from endpoint: %s", err) + } + return resp.StatusCode +} + +func TestKubeprobes(t *testing.T) { + live, ready := NewStatefulProbe(), NewStatefulProbe() + + tests := map[string]struct { + livenessProbeTransformation func(*testing.T, *StatefulProbe) + readinessProbeTransformation func(*testing.T, *StatefulProbe) + expectedLiveStatus int + expectedReadyStatus int + }{ + "not live": { + livenessProbeTransformation: markAsDown, + readinessProbeTransformation: markAsDown, + expectedLiveStatus: http.StatusServiceUnavailable, + expectedReadyStatus: http.StatusServiceUnavailable, + }, + "live but not ready": { + livenessProbeTransformation: markAsUp, + readinessProbeTransformation: markAsDown, + expectedLiveStatus: http.StatusOK, + expectedReadyStatus: http.StatusServiceUnavailable, + }, + "live and ready": { + livenessProbeTransformation: markAsUp, + readinessProbeTransformation: markAsUp, + expectedLiveStatus: http.StatusOK, + expectedReadyStatus: http.StatusOK, + }, + "ready but not live - should never happen": { + livenessProbeTransformation: markAsDown, + readinessProbeTransformation: markAsUp, + expectedLiveStatus: http.StatusServiceUnavailable, + expectedReadyStatus: http.StatusServiceUnavailable, + }, + } + + kp := New( + WithLivenessProbes(live.GetProbeFunction()), + WithReadinessProbes(ready.GetProbeFunction()), + ) + + srv := httptest.NewServer(kp) + defer srv.Close() + client := srv.Client() + + for name, test := range tests { + name, test := name, test + t.Run(name, func(t *testing.T) { + test.livenessProbeTransformation(t, live) + test.readinessProbeTransformation(t, ready) + + liveStatus := getStatusFromEndpoint(t, client, srv.URL+"/live") + readyStatus := getStatusFromEndpoint(t, client, srv.URL+"/ready") + otherStatus := getStatusFromEndpoint(t, client, srv.URL+"/something") + + if liveStatus != test.expectedLiveStatus { + t.Errorf("expected live status %d, got %d", test.expectedLiveStatus, liveStatus) + } + if readyStatus != test.expectedReadyStatus { + t.Errorf("expected ready status %d, got %d", test.expectedReadyStatus, readyStatus) + } + if otherStatus != http.StatusNotFound { + t.Errorf("expected 404 status, got %d", otherStatus) + } + }) + } +} diff --git a/stateful_probe.go b/stateful_probe.go new file mode 100644 index 0000000..fba59b7 --- /dev/null +++ b/stateful_probe.go @@ -0,0 +1,52 @@ +package kubeprobes + +import ( + "errors" + "sync" +) + +var errProbeDown = errors.New("DOWN") + +// StatefulProbe represents the simple probe that can be either +// marked as "up" (healthy) or "down" (unhealthy). +type StatefulProbe struct { + status bool + mux sync.Mutex +} + +// NewStatefulProbe returns a new instance of a stateful probe +// which can be either marked as "up" (healthy) or "down" (unhealthy). +// The probe is initially marked as "down". +func NewStatefulProbe() *StatefulProbe { + return &StatefulProbe{ + status: false, + mux: sync.Mutex{}, + } +} + +// MarkAsUp marks the probe as healthy +func (sp *StatefulProbe) MarkAsUp() { + sp.mux.Lock() + defer sp.mux.Unlock() + sp.status = true +} + +// MarkAsDown marks the probe as unhealthy +func (sp *StatefulProbe) MarkAsDown() { + sp.mux.Lock() + defer sp.mux.Unlock() + sp.status = false +} + +// GetProbeFunction returns a function that can be used to check +// whether the probe is healthy or not. +func (sp *StatefulProbe) GetProbeFunction() ProbeFunction { + return func() error { + sp.mux.Lock() + defer sp.mux.Unlock() + if sp.status { + return nil + } + return errProbeDown + } +} diff --git a/stateful_probe_test.go b/stateful_probe_test.go new file mode 100644 index 0000000..2d45ac2 --- /dev/null +++ b/stateful_probe_test.go @@ -0,0 +1,42 @@ +package kubeprobes + +import "testing" + +var ( + markAsDown func(*testing.T, *StatefulProbe) = func(t *testing.T, sp *StatefulProbe) { + t.Helper() + sp.MarkAsDown() + } + markAsUp func(*testing.T, *StatefulProbe) = func(t *testing.T, sp *StatefulProbe) { + t.Helper() + sp.MarkAsUp() + } +) + +func TestStatefulProbe(t *testing.T) { + tests := map[string]struct { + probeTransformation func(*testing.T, *StatefulProbe) + expectedError bool + }{ + "mark as up": { + probeTransformation: markAsUp, + expectedError: false, + }, + "mark as down": { + probeTransformation: markAsDown, + expectedError: true, + }, + } + + for name, test := range tests { + name, test := name, test + t.Run(name, func(t *testing.T) { + sp := NewStatefulProbe() + test.probeTransformation(t, sp) + probeFunc := sp.GetProbeFunction() + if (probeFunc() != nil) != test.expectedError { + t.Error("result not as expected") + } + }) + } +}