feat(pkg): add initial source code
This commit is contained in:
parent
b86b58ec88
commit
a5627a4f41
44
commons.go
Normal file
44
commons.go
Normal file
@ -0,0 +1,44 @@
|
||||
package kubeprobes
|
||||
|
||||
import "sync"
|
||||
|
||||
// ProbeFunction is a function that determines whether
|
||||
// the given metric may be marked as correctly functioning.
|
||||
// It not, the error should be returned.
|
||||
type ProbeFunction func() error
|
||||
|
||||
type statusQuery struct {
|
||||
allGreen bool
|
||||
mux sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
func (sq *statusQuery) isAllGreen() bool {
|
||||
sq.wg.Wait()
|
||||
sq.mux.Lock()
|
||||
defer sq.mux.Unlock()
|
||||
return sq.allGreen
|
||||
}
|
||||
|
||||
func newStatusQuery(probes []ProbeFunction) *statusQuery {
|
||||
sq := &statusQuery{
|
||||
allGreen: true,
|
||||
mux: sync.Mutex{},
|
||||
wg: sync.WaitGroup{},
|
||||
}
|
||||
|
||||
sq.wg.Add(len(probes))
|
||||
for _, probe := range probes {
|
||||
probe := probe
|
||||
go func() {
|
||||
defer sq.wg.Done()
|
||||
if err := probe(); err != nil {
|
||||
sq.mux.Lock()
|
||||
sq.allGreen = false
|
||||
sq.mux.Unlock()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
return sq
|
||||
}
|
45
commons_test.go
Normal file
45
commons_test.go
Normal file
@ -0,0 +1,45 @@
|
||||
package kubeprobes
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestStatusQueryIsAllGreen(t *testing.T) {
|
||||
tests := map[string]struct {
|
||||
probes []ProbeFunction
|
||||
expectedStatus bool
|
||||
}{
|
||||
"all green": {
|
||||
probes: []ProbeFunction{
|
||||
func() error { return nil },
|
||||
func() error { time.Sleep(2 * time.Second); return nil },
|
||||
},
|
||||
expectedStatus: true,
|
||||
},
|
||||
"some failed": {
|
||||
probes: []ProbeFunction{
|
||||
func() error { return nil },
|
||||
func() error { time.Sleep(2 * time.Second); return errors.New("failed") },
|
||||
},
|
||||
expectedStatus: false,
|
||||
},
|
||||
"all failed": {
|
||||
probes: []ProbeFunction{
|
||||
func() error { return errors.New("failed") },
|
||||
func() error { time.Sleep(2 * time.Second); return errors.New("failed") },
|
||||
},
|
||||
expectedStatus: false,
|
||||
},
|
||||
}
|
||||
|
||||
for name, test := range tests {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
sq := newStatusQuery(test.probes)
|
||||
if sq.isAllGreen() != test.expectedStatus {
|
||||
t.Errorf("expected status %v, got %v", test.expectedStatus, sq.isAllGreen())
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
62
probes.go
Normal file
62
probes.go
Normal file
@ -0,0 +1,62 @@
|
||||
package kubeprobes
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
)
|
||||
|
||||
type kubeprobes struct {
|
||||
livenessProbes []ProbeFunction
|
||||
readinessProbes []ProbeFunction
|
||||
}
|
||||
|
||||
// ServeHTTP implements http.Handler interface
|
||||
func (kp *kubeprobes) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
switch r.URL.Path {
|
||||
case "/live":
|
||||
sq := newStatusQuery(kp.livenessProbes)
|
||||
if sq.isAllGreen() {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
} else {
|
||||
w.WriteHeader(http.StatusServiceUnavailable)
|
||||
}
|
||||
case "/ready":
|
||||
sq := newStatusQuery(append(kp.livenessProbes, kp.readinessProbes...))
|
||||
if sq.isAllGreen() {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
} else {
|
||||
w.WriteHeader(http.StatusServiceUnavailable)
|
||||
}
|
||||
default:
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
}
|
||||
}
|
||||
|
||||
type option func(*kubeprobes)
|
||||
|
||||
// New returns a new instance of a Kubernetes probes
|
||||
func New(options ...option) *kubeprobes {
|
||||
kp := &kubeprobes{
|
||||
livenessProbes: []ProbeFunction{},
|
||||
readinessProbes: []ProbeFunction{},
|
||||
}
|
||||
|
||||
for _, option := range options {
|
||||
option(kp)
|
||||
}
|
||||
|
||||
return kp
|
||||
}
|
||||
|
||||
// WithLivenessProbes adds given liveness probes to the set of probes
|
||||
func WithLivenessProbes(probes ...ProbeFunction) option {
|
||||
return func(kp *kubeprobes) {
|
||||
kp.livenessProbes = append(kp.livenessProbes, probes...)
|
||||
}
|
||||
}
|
||||
|
||||
// WithReadinessProbes adds given readiness probes to the set of probes
|
||||
func WithReadinessProbes(probes ...ProbeFunction) option {
|
||||
return func(kp *kubeprobes) {
|
||||
kp.readinessProbes = append(kp.readinessProbes, probes...)
|
||||
}
|
||||
}
|
83
probes_test.go
Normal file
83
probes_test.go
Normal file
@ -0,0 +1,83 @@
|
||||
package kubeprobes
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func getStatusFromEndpoint(t *testing.T, client *http.Client, endpoint string) int {
|
||||
t.Helper()
|
||||
resp, err := client.Get(endpoint)
|
||||
if err != nil {
|
||||
t.Errorf("error getting status from endpoint: %s", err)
|
||||
}
|
||||
return resp.StatusCode
|
||||
}
|
||||
|
||||
func TestKubeprobes(t *testing.T) {
|
||||
live, ready := NewStatefulProbe(), NewStatefulProbe()
|
||||
|
||||
tests := map[string]struct {
|
||||
livenessProbeTransformation func(*testing.T, *StatefulProbe)
|
||||
readinessProbeTransformation func(*testing.T, *StatefulProbe)
|
||||
expectedLiveStatus int
|
||||
expectedReadyStatus int
|
||||
}{
|
||||
"not live": {
|
||||
livenessProbeTransformation: markAsDown,
|
||||
readinessProbeTransformation: markAsDown,
|
||||
expectedLiveStatus: http.StatusServiceUnavailable,
|
||||
expectedReadyStatus: http.StatusServiceUnavailable,
|
||||
},
|
||||
"live but not ready": {
|
||||
livenessProbeTransformation: markAsUp,
|
||||
readinessProbeTransformation: markAsDown,
|
||||
expectedLiveStatus: http.StatusOK,
|
||||
expectedReadyStatus: http.StatusServiceUnavailable,
|
||||
},
|
||||
"live and ready": {
|
||||
livenessProbeTransformation: markAsUp,
|
||||
readinessProbeTransformation: markAsUp,
|
||||
expectedLiveStatus: http.StatusOK,
|
||||
expectedReadyStatus: http.StatusOK,
|
||||
},
|
||||
"ready but not live - should never happen": {
|
||||
livenessProbeTransformation: markAsDown,
|
||||
readinessProbeTransformation: markAsUp,
|
||||
expectedLiveStatus: http.StatusServiceUnavailable,
|
||||
expectedReadyStatus: http.StatusServiceUnavailable,
|
||||
},
|
||||
}
|
||||
|
||||
kp := New(
|
||||
WithLivenessProbes(live.GetProbeFunction()),
|
||||
WithReadinessProbes(ready.GetProbeFunction()),
|
||||
)
|
||||
|
||||
srv := httptest.NewServer(kp)
|
||||
defer srv.Close()
|
||||
client := srv.Client()
|
||||
|
||||
for name, test := range tests {
|
||||
name, test := name, test
|
||||
t.Run(name, func(t *testing.T) {
|
||||
test.livenessProbeTransformation(t, live)
|
||||
test.readinessProbeTransformation(t, ready)
|
||||
|
||||
liveStatus := getStatusFromEndpoint(t, client, srv.URL+"/live")
|
||||
readyStatus := getStatusFromEndpoint(t, client, srv.URL+"/ready")
|
||||
otherStatus := getStatusFromEndpoint(t, client, srv.URL+"/something")
|
||||
|
||||
if liveStatus != test.expectedLiveStatus {
|
||||
t.Errorf("expected live status %d, got %d", test.expectedLiveStatus, liveStatus)
|
||||
}
|
||||
if readyStatus != test.expectedReadyStatus {
|
||||
t.Errorf("expected ready status %d, got %d", test.expectedReadyStatus, readyStatus)
|
||||
}
|
||||
if otherStatus != http.StatusNotFound {
|
||||
t.Errorf("expected 404 status, got %d", otherStatus)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
52
stateful_probe.go
Normal file
52
stateful_probe.go
Normal file
@ -0,0 +1,52 @@
|
||||
package kubeprobes
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
)
|
||||
|
||||
var errProbeDown = errors.New("DOWN")
|
||||
|
||||
// StatefulProbe represents the simple probe that can be either
|
||||
// marked as "up" (healthy) or "down" (unhealthy).
|
||||
type StatefulProbe struct {
|
||||
status bool
|
||||
mux sync.Mutex
|
||||
}
|
||||
|
||||
// NewStatefulProbe returns a new instance of a stateful probe
|
||||
// which can be either marked as "up" (healthy) or "down" (unhealthy).
|
||||
// The probe is initially marked as "down".
|
||||
func NewStatefulProbe() *StatefulProbe {
|
||||
return &StatefulProbe{
|
||||
status: false,
|
||||
mux: sync.Mutex{},
|
||||
}
|
||||
}
|
||||
|
||||
// MarkAsUp marks the probe as healthy
|
||||
func (sp *StatefulProbe) MarkAsUp() {
|
||||
sp.mux.Lock()
|
||||
defer sp.mux.Unlock()
|
||||
sp.status = true
|
||||
}
|
||||
|
||||
// MarkAsDown marks the probe as unhealthy
|
||||
func (sp *StatefulProbe) MarkAsDown() {
|
||||
sp.mux.Lock()
|
||||
defer sp.mux.Unlock()
|
||||
sp.status = false
|
||||
}
|
||||
|
||||
// GetProbeFunction returns a function that can be used to check
|
||||
// whether the probe is healthy or not.
|
||||
func (sp *StatefulProbe) GetProbeFunction() ProbeFunction {
|
||||
return func() error {
|
||||
sp.mux.Lock()
|
||||
defer sp.mux.Unlock()
|
||||
if sp.status {
|
||||
return nil
|
||||
}
|
||||
return errProbeDown
|
||||
}
|
||||
}
|
42
stateful_probe_test.go
Normal file
42
stateful_probe_test.go
Normal file
@ -0,0 +1,42 @@
|
||||
package kubeprobes
|
||||
|
||||
import "testing"
|
||||
|
||||
var (
|
||||
markAsDown func(*testing.T, *StatefulProbe) = func(t *testing.T, sp *StatefulProbe) {
|
||||
t.Helper()
|
||||
sp.MarkAsDown()
|
||||
}
|
||||
markAsUp func(*testing.T, *StatefulProbe) = func(t *testing.T, sp *StatefulProbe) {
|
||||
t.Helper()
|
||||
sp.MarkAsUp()
|
||||
}
|
||||
)
|
||||
|
||||
func TestStatefulProbe(t *testing.T) {
|
||||
tests := map[string]struct {
|
||||
probeTransformation func(*testing.T, *StatefulProbe)
|
||||
expectedError bool
|
||||
}{
|
||||
"mark as up": {
|
||||
probeTransformation: markAsUp,
|
||||
expectedError: false,
|
||||
},
|
||||
"mark as down": {
|
||||
probeTransformation: markAsDown,
|
||||
expectedError: true,
|
||||
},
|
||||
}
|
||||
|
||||
for name, test := range tests {
|
||||
name, test := name, test
|
||||
t.Run(name, func(t *testing.T) {
|
||||
sp := NewStatefulProbe()
|
||||
test.probeTransformation(t, sp)
|
||||
probeFunc := sp.GetProbeFunction()
|
||||
if (probeFunc() != nil) != test.expectedError {
|
||||
t.Error("result not as expected")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user