Merge pull request 'feat(probes)!: rewrite probes logic with named probes' (#9) from naming into devel
Reviewed-on: #9
This commit is contained in:
commit
ff3db1ea10
56
README.md
56
README.md
@ -12,43 +12,43 @@ go get -u pkg.icikowski.pl/kubeprobes
|
||||
|
||||
The package provides `kubeprobes.New` function which returns a probes handler of type `kubeprobes.Kubeprobes`, which is compliant with `http.Handler` interface.
|
||||
|
||||
The handler serves two endpoints, which are used to implement liveness and readiness probes by returning either `200` (healthy) or `503` (unhealthy) status:
|
||||
The handler serves two endpoints, which are used to implement liveness and readiness probes by returning either `200` (healthy) or `503` (unhealthy) status and JSON response with probes results:
|
||||
|
||||
- `/live` - endpoint for liveness probe;
|
||||
- `/ready` - endpoint for readiness probe.
|
||||
|
||||
Default paths can be overriden with options described below. Accessing any other endpoint will return `404` status. In order to provide maximum performance, no body is ever returned.
|
||||
Default paths can be overriden with options described below. Accessing any other endpoint will return `404` status. By default, response body only contains a list of failed probes, but this behavior can be changed with provided option or by adding `?v` query parameter.
|
||||
|
||||
The `kubeprobes.New` function accepts following options as arguments:
|
||||
|
||||
- `kubeprobes.WithLivenessProbes(...)` - adds particular [probe functions](#probe-functions) to the list of liveness probes;
|
||||
- `kubeprobes.WithLivenessStatefulProbes(...)` - adds particular [`StatefulProbe`s](#stateful-probes) to the list of liveness probes;
|
||||
- `kubeprobes.WithLivenessPath("/some/liveness/path")` - sets liveness probe path to given path (default is `/live`);
|
||||
- `kubeprobes.WithReadinessProbes(...)` - adds particular [probe functions](#probe-functions) to the list of readiness probes;
|
||||
- `kubeprobes.WithReadinessStatefulProbes(...)` - adds particular [`StatefulProbe`s](#stateful-probes) to the list of readiness probes;
|
||||
- `kubeprobes.WithReadinessPath("/some/readiness/path")` - sets readiness probe path to given path (default is `/ready`).
|
||||
- `kubeprobes.WithReadinessPath("/some/readiness/path")` - sets readiness probe path to given path (default is `/ready`);
|
||||
- `kubeprobes.WithVerboseOutput()` - enables verbose output by default (returns both failed and passed probes).
|
||||
|
||||
## Probes
|
||||
|
||||
In order to determine the state of particular element of application, probes need to be implemented either by creating [status determining function](#probe-functions) or by using simple and thread-safe [stateful probes](#stateful-probes).
|
||||
In order to determine the state of particular element of application, probes need to be implemented either by creating [status determining function](#probe-functions) or by using simple and thread-safe [manual probes](#manual-probes).
|
||||
|
||||
### Probe functions
|
||||
|
||||
Probe functions (objects of type `ProbeFunction`) are functions that performs user defined logic in order to determine whether the probe should be marked as healthy or not. Those functions should take no arguments and return error (if no error is returned, the probe is considered to be healthy; if error is returned, the probe is considered to be unhealthy).
|
||||
Probe functions (instances of `ProbeFunction` interface) are wrappers for functions that performs user defined logic with given interval of updates in order to determine whether the probe should be marked as healthy or not. Those functions should take no arguments and return error (if no error is returned, the probe is considered to be healthy; if error is returned, the probe is considered to be unhealthy). If given interval is less or equal zero, then function is only checked on probe creation and remains in determined state forever.
|
||||
|
||||
```go
|
||||
someProbe := func() error {
|
||||
someProbe := kubeprobes.NewProbeFunction("live", func() error {
|
||||
// Some logic here
|
||||
if somethingIsWrong {
|
||||
return errors.New("something is wrong")
|
||||
if time.Now().Weekday() == time.Wednesday {
|
||||
// Fail only on wednesday!
|
||||
return errors.New("It's wednesday, my dudes!")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}, 1 * time.Hour)
|
||||
|
||||
someOtherProbe := func() error {
|
||||
someOtherProbe := kubeprobes.NewProbeFunction("ready", func() error {
|
||||
// Always healthy
|
||||
return nil
|
||||
}
|
||||
}, 0) // This probe is checked once
|
||||
|
||||
// Use functions in probes handler
|
||||
kp, _ := kubeprobes.New(
|
||||
@ -57,20 +57,28 @@ kp, _ := kubeprobes.New(
|
||||
)
|
||||
```
|
||||
|
||||
### Stateful probes
|
||||
### Manual probes
|
||||
|
||||
Stateful probes (objects of type `StatefulProbe`) are objects that can be marked either as "up" (healthy) or "down" (unhealthy) and provide a `ProbeFunction` for easy integration. Those objects utilize `sync.Mutex` mechanism to provide thread-safety.
|
||||
Manual probes (instances of `ManualProbe` interface) are objects that can be marked either as healthy or unhealthy and implement `ProbeFunction` for easy integration. Those objects utilize `sync.RMutex` mechanism to ensure thread-safety.
|
||||
|
||||
Those probes can be changed by user with provided methods:
|
||||
|
||||
- `Pass()` marks probe as healthy;
|
||||
- `Fail()` marks probe as unhealthy with generic cause;
|
||||
- `FailWithCause(someError)` marks probe as unhealthy with given error as cause.
|
||||
|
||||
```go
|
||||
// Unhealthy by default
|
||||
someProbe := kubeprobes.NewStatefulProbe()
|
||||
someOtherProbe := kubeprobes.NewStatefulProbe()
|
||||
someProbe := kubeprobes.NewManualProbe("live")
|
||||
someOtherProbe := kubeprobes.NewManualProbe("ready")
|
||||
|
||||
// Use it in probes handler
|
||||
kp, _ := kubeprobes.New(
|
||||
kubeprobes.WithLivenessStatefulProbes(someProbe),
|
||||
kubeprobes.WithReadinessStatefulProbes(someOtherProbe),
|
||||
kubeprobes.WithLivenessProbes(someProbe),
|
||||
kubeprobes.WithReadinessProbes(someOtherProbe),
|
||||
)
|
||||
|
||||
// Can be later marked according
|
||||
```
|
||||
|
||||
## Direct handler access
|
||||
@ -103,11 +111,11 @@ ready := kubeprobes.NewStatefulProbe()
|
||||
|
||||
// Prepare handler
|
||||
kp, err := kubeprobes.New(
|
||||
kubeprobes.WithLivenessStatefulProbes(live),
|
||||
kubeprobes.WithReadinessStatefulProbes(ready),
|
||||
kubeprobes.WithReadinessProbes(appProbe),
|
||||
kubeprobes.WithLivenessProbes(live),
|
||||
kubeprobes.WithReadinessProbes(ready, appProbe),
|
||||
kubeprobes.WithLivenessPath("/livez"),
|
||||
kubeprobes.WithReadinessPath("/readyz"),
|
||||
kubeprobes.WithVerboseOutput(),
|
||||
)
|
||||
if err != nil {
|
||||
// Kubeprobes object is validated for invalid or conflicting paths! ;)
|
||||
@ -122,6 +130,6 @@ probes := &http.Server{
|
||||
go probes.ListenAndServe()
|
||||
|
||||
// Mark probes as healthy
|
||||
live.MarkAsUp()
|
||||
ready.MarkAsUp()
|
||||
live.Pass()
|
||||
ready.Pass()
|
||||
```
|
||||
|
14
costants.go
14
costants.go
@ -1,7 +1,21 @@
|
||||
package kubeprobes
|
||||
|
||||
import (
|
||||
"errors"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultLivenessPath string = "/live"
|
||||
defaultReadinessPath string = "/ready"
|
||||
verboseOutputFlag string = "v"
|
||||
)
|
||||
|
||||
const (
|
||||
headerContentType string = "Content-Type"
|
||||
contentTypeJSON string = "application/json"
|
||||
)
|
||||
|
||||
var (
|
||||
errProbeNameEmpty error = errors.New("probe name must not be empty")
|
||||
errProbeFailed error = errors.New("probe marked as failed")
|
||||
)
|
||||
|
@ -1,16 +1,29 @@
|
||||
package kubeprobes
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// Kubeprobes represents liveness & readiness probes handler.
|
||||
type Kubeprobes interface {
|
||||
http.Handler
|
||||
|
||||
// LivenessHandler returns [http.Handler] for liveness probes.
|
||||
LivenessHandler() http.Handler
|
||||
// ReadinessHandler returns [http.Handler] for readiness probes.
|
||||
ReadinessHandler() http.Handler
|
||||
}
|
||||
|
||||
type kubeprobes struct {
|
||||
livenessProbes []ProbeFunction
|
||||
readinessProbes []ProbeFunction
|
||||
|
||||
verbose bool
|
||||
|
||||
pathLive string
|
||||
pathReady string
|
||||
}
|
||||
@ -20,8 +33,8 @@ func New(options ...Option) (Kubeprobes, error) {
|
||||
kp := &kubeprobes{
|
||||
livenessProbes: []ProbeFunction{},
|
||||
readinessProbes: []ProbeFunction{},
|
||||
pathLive: "/live",
|
||||
pathReady: "/ready",
|
||||
pathLive: defaultLivenessPath,
|
||||
pathReady: defaultReadinessPath,
|
||||
}
|
||||
|
||||
for _, option := range options {
|
||||
@ -36,6 +49,7 @@ func New(options ...Option) (Kubeprobes, error) {
|
||||
|
||||
func (kp *kubeprobes) validate() error {
|
||||
var err error
|
||||
|
||||
if kp.pathLive == "" {
|
||||
err = errors.Join(err, fmt.Errorf("liveness probe path must not be empty"))
|
||||
}
|
||||
@ -67,22 +81,47 @@ func (kp *kubeprobes) validate() error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (kp *kubeprobes) handleLiveness(w http.ResponseWriter, _ *http.Request) {
|
||||
sq := newStatusQuery(kp.livenessProbes)
|
||||
if sq.isAllGreen() {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
} else {
|
||||
w.WriteHeader(http.StatusServiceUnavailable)
|
||||
}
|
||||
type probesResponse struct {
|
||||
Passed []statusEntry `json:"passed,omitempty"`
|
||||
Failed []statusEntry `json:"failed,omitempty"`
|
||||
}
|
||||
|
||||
func (kp *kubeprobes) handleReadiness(w http.ResponseWriter, _ *http.Request) {
|
||||
sq := newStatusQuery(append(kp.livenessProbes, kp.readinessProbes...))
|
||||
if sq.isAllGreen() {
|
||||
func (kp *kubeprobes) handleLiveness(w http.ResponseWriter, r *http.Request) {
|
||||
sq := newStatusQuery(kp.livenessProbes)
|
||||
output := probesResponse{}
|
||||
|
||||
sq.wait()
|
||||
output.Failed = sq.failed
|
||||
if r.URL.Query().Has(verboseOutputFlag) || kp.verbose {
|
||||
output.Passed = sq.passed
|
||||
}
|
||||
|
||||
w.Header().Add(headerContentType, contentTypeJSON)
|
||||
if sq.ok {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
} else {
|
||||
w.WriteHeader(http.StatusServiceUnavailable)
|
||||
}
|
||||
_ = json.NewEncoder(w).Encode(output)
|
||||
}
|
||||
|
||||
func (kp *kubeprobes) handleReadiness(w http.ResponseWriter, r *http.Request) {
|
||||
sq := newStatusQuery(append(kp.livenessProbes, kp.readinessProbes...))
|
||||
output := probesResponse{}
|
||||
|
||||
sq.wait()
|
||||
output.Failed = sq.failed
|
||||
if r.URL.Query().Has(verboseOutputFlag) || kp.verbose {
|
||||
output.Passed = sq.passed
|
||||
}
|
||||
|
||||
w.Header().Add(headerContentType, contentTypeJSON)
|
||||
if sq.ok {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
} else {
|
||||
w.WriteHeader(http.StatusServiceUnavailable)
|
||||
}
|
||||
_ = json.NewEncoder(w).Encode(output)
|
||||
}
|
||||
|
||||
// LivenessHandler implements Kubeprobes.
|
47
kubeprobes_options.go
Normal file
47
kubeprobes_options.go
Normal file
@ -0,0 +1,47 @@
|
||||
package kubeprobes
|
||||
|
||||
// Option represents a [Kubeprobes] constructor option.
|
||||
type Option interface {
|
||||
apply(kp *kubeprobes)
|
||||
}
|
||||
|
||||
type option func(*kubeprobes)
|
||||
|
||||
func (o option) apply(kp *kubeprobes) {
|
||||
o(kp)
|
||||
}
|
||||
|
||||
// WithLivenessProbes adds given probe functions to the set of liveness probes.
|
||||
func WithLivenessProbes(probes ...ProbeFunction) Option {
|
||||
return option(func(kp *kubeprobes) {
|
||||
kp.livenessProbes = append(kp.livenessProbes, probes...)
|
||||
})
|
||||
}
|
||||
|
||||
// WithLivenessPath sets custom path for liveness probe (default is "/live").
|
||||
func WithLivenessPath(path string) Option {
|
||||
return option(func(kp *kubeprobes) {
|
||||
kp.pathLive = path
|
||||
})
|
||||
}
|
||||
|
||||
// WithReadinessProbes adds given probe functions to the set of readiness probes.
|
||||
func WithReadinessProbes(probes ...ProbeFunction) Option {
|
||||
return option(func(kp *kubeprobes) {
|
||||
kp.readinessProbes = append(kp.readinessProbes, probes...)
|
||||
})
|
||||
}
|
||||
|
||||
// WithReadinessPath sets custom path for readiness probe (default is "/ready").
|
||||
func WithReadinessPath(path string) Option {
|
||||
return option(func(kp *kubeprobes) {
|
||||
kp.pathReady = path
|
||||
})
|
||||
}
|
||||
|
||||
// WithVerboseOutput enables verbose output for every request.
|
||||
func WithVerboseOutput() Option {
|
||||
return option(func(kp *kubeprobes) {
|
||||
kp.verbose = true
|
||||
})
|
||||
}
|
@ -17,8 +17,8 @@ func getStatusFromEndpoint(t *testing.T, client *http.Client, endpoint string) i
|
||||
|
||||
func TestValidation(t *testing.T) {
|
||||
var (
|
||||
live = NewStatefulProbe()
|
||||
ready = NewStatefulProbe()
|
||||
live, _ = NewManualProbe("live")
|
||||
ready, _ = NewManualProbe("ready")
|
||||
)
|
||||
|
||||
tests := map[string]struct {
|
||||
@ -27,66 +27,66 @@ func TestValidation(t *testing.T) {
|
||||
}{
|
||||
"no modifications and no error": {
|
||||
opts: []Option{
|
||||
WithLivenessStatefulProbes(live),
|
||||
WithReadinessStatefulProbes(ready),
|
||||
WithLivenessProbes(live),
|
||||
WithReadinessProbes(ready),
|
||||
},
|
||||
},
|
||||
"modifications and no error": {
|
||||
opts: []Option{
|
||||
WithLivenessStatefulProbes(live),
|
||||
WithReadinessStatefulProbes(ready),
|
||||
WithLivenessProbes(live),
|
||||
WithReadinessProbes(ready),
|
||||
WithLivenessPath("/livez"),
|
||||
WithReadinessPath("/readyz"),
|
||||
},
|
||||
},
|
||||
"missing liveness probes": {
|
||||
opts: []Option{
|
||||
WithReadinessStatefulProbes(ready),
|
||||
WithReadinessProbes(ready),
|
||||
},
|
||||
expectedError: true,
|
||||
},
|
||||
"missing readiness probes": {
|
||||
opts: []Option{
|
||||
WithLivenessStatefulProbes(live),
|
||||
WithLivenessProbes(live),
|
||||
},
|
||||
expectedError: true,
|
||||
},
|
||||
"liveness probe path empty": {
|
||||
opts: []Option{
|
||||
WithLivenessStatefulProbes(live),
|
||||
WithReadinessStatefulProbes(ready),
|
||||
WithLivenessProbes(live),
|
||||
WithReadinessProbes(ready),
|
||||
WithLivenessPath(""),
|
||||
},
|
||||
expectedError: true,
|
||||
},
|
||||
"readiness probe path empty": {
|
||||
opts: []Option{
|
||||
WithLivenessStatefulProbes(live),
|
||||
WithReadinessStatefulProbes(ready),
|
||||
WithLivenessProbes(live),
|
||||
WithReadinessProbes(ready),
|
||||
WithReadinessPath(""),
|
||||
},
|
||||
expectedError: true,
|
||||
},
|
||||
"liveness probe path does not start with slash": {
|
||||
opts: []Option{
|
||||
WithLivenessStatefulProbes(live),
|
||||
WithReadinessStatefulProbes(ready),
|
||||
WithLivenessProbes(live),
|
||||
WithReadinessProbes(ready),
|
||||
WithLivenessPath("livez"),
|
||||
},
|
||||
expectedError: true,
|
||||
},
|
||||
"readiness probe path does not start with slash": {
|
||||
opts: []Option{
|
||||
WithLivenessStatefulProbes(live),
|
||||
WithReadinessStatefulProbes(ready),
|
||||
WithLivenessProbes(live),
|
||||
WithReadinessProbes(ready),
|
||||
WithReadinessPath("readyz"),
|
||||
},
|
||||
expectedError: true,
|
||||
},
|
||||
"liveness and readiness probe paths are equal": {
|
||||
opts: []Option{
|
||||
WithLivenessStatefulProbes(live),
|
||||
WithReadinessStatefulProbes(ready),
|
||||
WithLivenessProbes(live),
|
||||
WithReadinessProbes(ready),
|
||||
WithLivenessPath("/check"),
|
||||
WithReadinessPath("/check"),
|
||||
},
|
||||
@ -109,11 +109,14 @@ func TestValidation(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestHandler(t *testing.T) {
|
||||
live, ready := NewStatefulProbe(), NewStatefulProbe()
|
||||
var (
|
||||
live, _ = NewManualProbe("live")
|
||||
ready, _ = NewManualProbe("ready")
|
||||
)
|
||||
|
||||
tests := map[string]struct {
|
||||
livenessProbeTransformation func(*testing.T, *StatefulProbe)
|
||||
readinessProbeTransformation func(*testing.T, *StatefulProbe)
|
||||
livenessProbeTransformation func(*testing.T, ManualProbe)
|
||||
readinessProbeTransformation func(*testing.T, ManualProbe)
|
||||
expectedLiveStatus int
|
||||
expectedReadyStatus int
|
||||
}{
|
||||
@ -144,8 +147,8 @@ func TestHandler(t *testing.T) {
|
||||
}
|
||||
|
||||
kp, err := New(
|
||||
WithLivenessStatefulProbes(live),
|
||||
WithReadinessStatefulProbes(ready),
|
||||
WithLivenessProbes(live),
|
||||
WithReadinessProbes(ready),
|
||||
)
|
||||
if err != nil {
|
||||
t.Errorf("expected no error, got %v", err)
|
67
options.go
67
options.go
@ -1,67 +0,0 @@
|
||||
package kubeprobes
|
||||
|
||||
type option struct {
|
||||
fn func(*kubeprobes)
|
||||
}
|
||||
|
||||
func (o *option) apply(kp *kubeprobes) {
|
||||
o.fn(kp)
|
||||
}
|
||||
|
||||
// WithLivenessProbes adds given probe functions to the set of liveness probes.
|
||||
func WithLivenessProbes(probes ...ProbeFunction) Option {
|
||||
return &option{
|
||||
fn: func(kp *kubeprobes) {
|
||||
kp.livenessProbes = append(kp.livenessProbes, probes...)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// WithLivenessStatefulProbes adds given [StatefulProbe]s to the set of liveness probes.
|
||||
func WithLivenessStatefulProbes(probes ...*StatefulProbe) Option {
|
||||
return &option{
|
||||
fn: func(kp *kubeprobes) {
|
||||
for _, p := range probes {
|
||||
kp.livenessProbes = append(kp.livenessProbes, p.GetProbeFunction())
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// WithLivenessPath sets custom path for liveness probe (default is "/live").
|
||||
func WithLivenessPath(path string) Option {
|
||||
return &option{
|
||||
fn: func(kp *kubeprobes) {
|
||||
kp.pathLive = path
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// WithReadinessProbes adds given probe functions to the set of readiness probes.
|
||||
func WithReadinessProbes(probes ...ProbeFunction) Option {
|
||||
return &option{
|
||||
fn: func(kp *kubeprobes) {
|
||||
kp.readinessProbes = append(kp.readinessProbes, probes...)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// WithReadinessProbes adds given [StatefulProbe]s to the set of readiness probes.
|
||||
func WithReadinessStatefulProbes(probes ...*StatefulProbe) Option {
|
||||
return &option{
|
||||
fn: func(kp *kubeprobes) {
|
||||
for _, p := range probes {
|
||||
kp.readinessProbes = append(kp.readinessProbes, p.GetProbeFunction())
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// WithReadinessPath sets custom path for readiness probe (default is "/ready").
|
||||
func WithReadinessPath(path string) Option {
|
||||
return &option{
|
||||
fn: func(kp *kubeprobes) {
|
||||
kp.pathReady = path
|
||||
},
|
||||
}
|
||||
}
|
81
probe_function.go
Normal file
81
probe_function.go
Normal file
@ -0,0 +1,81 @@
|
||||
package kubeprobes
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ProbeFunction is a wrapper for a function that determines whether
|
||||
// the given metric may be marked as correctly functioning.
|
||||
// It not, the error should be returned.
|
||||
type ProbeFunction interface {
|
||||
name() string
|
||||
status() error
|
||||
}
|
||||
|
||||
type probeFunction struct {
|
||||
probeName string
|
||||
probeFunc func() error
|
||||
refreshInterval time.Duration
|
||||
|
||||
mux sync.RWMutex
|
||||
err error
|
||||
}
|
||||
|
||||
// NewProbeFunction returns new instance of [ProbeFunction].
|
||||
//
|
||||
// If update interval is less or equal zero then probe is updated only
|
||||
// on its creation and remains in the same state forever.
|
||||
func NewProbeFunction(
|
||||
name string,
|
||||
fn func() error,
|
||||
updateInterval time.Duration,
|
||||
) (ProbeFunction, error) {
|
||||
if name == "" {
|
||||
return nil, errProbeNameEmpty
|
||||
}
|
||||
|
||||
pf := &probeFunction{
|
||||
probeName: name,
|
||||
probeFunc: fn,
|
||||
refreshInterval: updateInterval,
|
||||
mux: sync.RWMutex{},
|
||||
}
|
||||
|
||||
defer pf.autoUpdate()
|
||||
return pf, nil
|
||||
}
|
||||
|
||||
// name implements ProbeFunction.
|
||||
func (pf *probeFunction) name() string {
|
||||
return pf.probeName
|
||||
}
|
||||
|
||||
// status implements ProbeFunction.
|
||||
func (pf *probeFunction) status() error {
|
||||
pf.mux.RLock()
|
||||
defer pf.mux.RUnlock()
|
||||
return pf.err
|
||||
}
|
||||
|
||||
func (pf *probeFunction) update() {
|
||||
err := pf.probeFunc()
|
||||
pf.mux.Lock()
|
||||
pf.err = err
|
||||
pf.mux.Unlock()
|
||||
}
|
||||
|
||||
func (pf *probeFunction) autoUpdate() {
|
||||
pf.update()
|
||||
if pf.refreshInterval <= 0 {
|
||||
return
|
||||
}
|
||||
|
||||
go func() {
|
||||
ticker := time.NewTicker(pf.refreshInterval)
|
||||
for {
|
||||
<-ticker.C
|
||||
pf.update()
|
||||
}
|
||||
}()
|
||||
}
|
71
probe_manual.go
Normal file
71
probe_manual.go
Normal file
@ -0,0 +1,71 @@
|
||||
package kubeprobes
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
// ManualProbe represents the simple probe that can be either
|
||||
// marked as "up" (healthy) or "down" (unhealthy).
|
||||
type ManualProbe interface {
|
||||
ProbeFunction
|
||||
|
||||
// Pass marks the probe as healthy.
|
||||
Pass()
|
||||
// Fail marks the probe as unhealthy.
|
||||
Fail()
|
||||
// FailWitCause marks the probe as unhealthy with given cause.
|
||||
FailWithCause(err error)
|
||||
}
|
||||
|
||||
type manualProbe struct {
|
||||
probeName string
|
||||
err error
|
||||
mux sync.RWMutex
|
||||
}
|
||||
|
||||
// NewManualProbe returns a new instance of a manual probe
|
||||
// which can be either marked as healthy or unhealthy.
|
||||
// The probe is initially marked as unhealthy.
|
||||
func NewManualProbe(name string) (ManualProbe, error) {
|
||||
if name == "" {
|
||||
return nil, errProbeNameEmpty
|
||||
}
|
||||
|
||||
return &manualProbe{
|
||||
probeName: name,
|
||||
mux: sync.RWMutex{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// name implements ManualProbe.
|
||||
func (mp *manualProbe) name() string {
|
||||
return mp.probeName
|
||||
}
|
||||
|
||||
// status implements ManualProbe.
|
||||
func (mp *manualProbe) status() error {
|
||||
mp.mux.RLock()
|
||||
defer mp.mux.RUnlock()
|
||||
return mp.err
|
||||
}
|
||||
|
||||
// Pass implements ManualProbe.
|
||||
func (mp *manualProbe) Pass() {
|
||||
mp.mux.Lock()
|
||||
defer mp.mux.Unlock()
|
||||
mp.err = nil
|
||||
}
|
||||
|
||||
// Fail implements ManualProbe.
|
||||
func (mp *manualProbe) Fail() {
|
||||
mp.mux.Lock()
|
||||
defer mp.mux.Unlock()
|
||||
mp.err = errProbeFailed
|
||||
}
|
||||
|
||||
// FailWithCause implements ManualProbe.
|
||||
func (mp *manualProbe) FailWithCause(err error) {
|
||||
mp.mux.Lock()
|
||||
defer mp.mux.Unlock()
|
||||
mp.err = err
|
||||
}
|
46
probe_manual_test.go
Normal file
46
probe_manual_test.go
Normal file
@ -0,0 +1,46 @@
|
||||
package kubeprobes
|
||||
|
||||
import "testing"
|
||||
|
||||
var (
|
||||
markAsDown func(*testing.T, ManualProbe) = func(t *testing.T, sp ManualProbe) {
|
||||
t.Helper()
|
||||
sp.Fail()
|
||||
}
|
||||
markAsUp func(*testing.T, ManualProbe) = func(t *testing.T, sp ManualProbe) {
|
||||
t.Helper()
|
||||
sp.Pass()
|
||||
}
|
||||
)
|
||||
|
||||
func TestManualProbe(t *testing.T) {
|
||||
tests := map[string]struct {
|
||||
probeTransformation func(*testing.T, ManualProbe)
|
||||
expectedError bool
|
||||
}{
|
||||
"mark as up": {
|
||||
probeTransformation: markAsUp,
|
||||
expectedError: false,
|
||||
},
|
||||
"mark as down": {
|
||||
probeTransformation: markAsDown,
|
||||
expectedError: true,
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range tests {
|
||||
name, tc := name, tc
|
||||
t.Run(name, func(t *testing.T) {
|
||||
sp, _ := NewManualProbe("some name")
|
||||
tc.probeTransformation(t, sp)
|
||||
err := sp.status()
|
||||
|
||||
switch {
|
||||
case err == nil && tc.expectedError:
|
||||
t.Error("expected error, but no error was returned")
|
||||
case err != nil && !tc.expectedError:
|
||||
t.Errorf("expected no error but got %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
43
query.go
43
query.go
@ -3,23 +3,30 @@ package kubeprobes
|
||||
import "sync"
|
||||
|
||||
type statusQuery struct {
|
||||
allGreen bool
|
||||
mux sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
ok bool
|
||||
passed []statusEntry
|
||||
failed []statusEntry
|
||||
|
||||
mux sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
func (sq *statusQuery) isAllGreen() bool {
|
||||
type statusEntry struct {
|
||||
Probe string `json:"probe"`
|
||||
Status error `json:"status,omitempty"`
|
||||
}
|
||||
|
||||
func (sq *statusQuery) wait() {
|
||||
sq.wg.Wait()
|
||||
sq.mux.Lock()
|
||||
defer sq.mux.Unlock()
|
||||
return sq.allGreen
|
||||
}
|
||||
|
||||
func newStatusQuery(probes []ProbeFunction) *statusQuery {
|
||||
sq := &statusQuery{
|
||||
allGreen: true,
|
||||
mux: sync.Mutex{},
|
||||
wg: sync.WaitGroup{},
|
||||
ok: true,
|
||||
passed: make([]statusEntry, 0, len(probes)),
|
||||
failed: make([]statusEntry, 0, len(probes)),
|
||||
mux: sync.Mutex{},
|
||||
wg: sync.WaitGroup{},
|
||||
}
|
||||
|
||||
sq.wg.Add(len(probes))
|
||||
@ -27,11 +34,19 @@ func newStatusQuery(probes []ProbeFunction) *statusQuery {
|
||||
probe := probe
|
||||
go func() {
|
||||
defer sq.wg.Done()
|
||||
if err := probe(); err != nil {
|
||||
sq.mux.Lock()
|
||||
sq.allGreen = false
|
||||
sq.mux.Unlock()
|
||||
sq.mux.Lock()
|
||||
if err := probe.status(); err != nil {
|
||||
sq.ok = false
|
||||
sq.failed = append(sq.failed, statusEntry{
|
||||
Probe: probe.name(),
|
||||
Status: err,
|
||||
})
|
||||
} else {
|
||||
sq.passed = append(sq.passed, statusEntry{
|
||||
Probe: probe.name(),
|
||||
})
|
||||
}
|
||||
sq.mux.Unlock()
|
||||
}()
|
||||
}
|
||||
|
||||
|
@ -1,35 +1,33 @@
|
||||
package kubeprobes
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestStatusQueryIsAllGreen(t *testing.T) {
|
||||
var (
|
||||
probePassing, _ = NewProbeFunction("pass", func() error {
|
||||
return nil
|
||||
}, 0)
|
||||
probeFailing, _ = NewProbeFunction("fail", func() error {
|
||||
return errProbeFailed
|
||||
}, 0)
|
||||
)
|
||||
|
||||
tests := map[string]struct {
|
||||
probes []ProbeFunction
|
||||
expectedStatus bool
|
||||
}{
|
||||
"all green": {
|
||||
probes: []ProbeFunction{
|
||||
func() error { return nil },
|
||||
func() error { time.Sleep(2 * time.Second); return nil },
|
||||
},
|
||||
probes: []ProbeFunction{probePassing},
|
||||
expectedStatus: true,
|
||||
},
|
||||
"some failed": {
|
||||
probes: []ProbeFunction{
|
||||
func() error { return nil },
|
||||
func() error { time.Sleep(2 * time.Second); return errors.New("failed") },
|
||||
},
|
||||
probes: []ProbeFunction{probePassing, probeFailing},
|
||||
expectedStatus: false,
|
||||
},
|
||||
"all failed": {
|
||||
probes: []ProbeFunction{
|
||||
func() error { return errors.New("failed") },
|
||||
func() error { time.Sleep(2 * time.Second); return errors.New("failed") },
|
||||
},
|
||||
probes: []ProbeFunction{probeFailing},
|
||||
expectedStatus: false,
|
||||
},
|
||||
}
|
||||
@ -37,8 +35,9 @@ func TestStatusQueryIsAllGreen(t *testing.T) {
|
||||
for name, test := range tests {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
sq := newStatusQuery(test.probes)
|
||||
if sq.isAllGreen() != test.expectedStatus {
|
||||
t.Errorf("expected status %v, got %v", test.expectedStatus, sq.isAllGreen())
|
||||
sq.wait()
|
||||
if sq.ok != test.expectedStatus {
|
||||
t.Errorf("expected status %v, got %v", test.expectedStatus, sq.ok)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -1,52 +0,0 @@
|
||||
package kubeprobes
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
)
|
||||
|
||||
var errProbeDown = errors.New("probe is down")
|
||||
|
||||
// StatefulProbe represents the simple probe that can be either
|
||||
// marked as "up" (healthy) or "down" (unhealthy).
|
||||
type StatefulProbe struct {
|
||||
mux sync.Mutex
|
||||
status bool
|
||||
}
|
||||
|
||||
// NewStatefulProbe returns a new instance of a stateful probe
|
||||
// which can be either marked as "up" (healthy) or "down" (unhealthy).
|
||||
// The probe is initially marked as "down".
|
||||
func NewStatefulProbe() *StatefulProbe {
|
||||
return &StatefulProbe{
|
||||
mux: sync.Mutex{},
|
||||
status: false,
|
||||
}
|
||||
}
|
||||
|
||||
// MarkAsUp marks the probe as healthy.
|
||||
func (sp *StatefulProbe) MarkAsUp() {
|
||||
sp.mux.Lock()
|
||||
defer sp.mux.Unlock()
|
||||
sp.status = true
|
||||
}
|
||||
|
||||
// MarkAsDown marks the probe as unhealthy.
|
||||
func (sp *StatefulProbe) MarkAsDown() {
|
||||
sp.mux.Lock()
|
||||
defer sp.mux.Unlock()
|
||||
sp.status = false
|
||||
}
|
||||
|
||||
// GetProbeFunction returns a function that can be used to check
|
||||
// whether the probe is healthy or not.
|
||||
func (sp *StatefulProbe) GetProbeFunction() ProbeFunction {
|
||||
return func() error {
|
||||
sp.mux.Lock()
|
||||
defer sp.mux.Unlock()
|
||||
if sp.status {
|
||||
return nil
|
||||
}
|
||||
return errProbeDown
|
||||
}
|
||||
}
|
@ -1,42 +0,0 @@
|
||||
package kubeprobes
|
||||
|
||||
import "testing"
|
||||
|
||||
var (
|
||||
markAsDown func(*testing.T, *StatefulProbe) = func(t *testing.T, sp *StatefulProbe) {
|
||||
t.Helper()
|
||||
sp.MarkAsDown()
|
||||
}
|
||||
markAsUp func(*testing.T, *StatefulProbe) = func(t *testing.T, sp *StatefulProbe) {
|
||||
t.Helper()
|
||||
sp.MarkAsUp()
|
||||
}
|
||||
)
|
||||
|
||||
func TestStatefulProbe(t *testing.T) {
|
||||
tests := map[string]struct {
|
||||
probeTransformation func(*testing.T, *StatefulProbe)
|
||||
expectedError bool
|
||||
}{
|
||||
"mark as up": {
|
||||
probeTransformation: markAsUp,
|
||||
expectedError: false,
|
||||
},
|
||||
"mark as down": {
|
||||
probeTransformation: markAsDown,
|
||||
expectedError: true,
|
||||
},
|
||||
}
|
||||
|
||||
for name, test := range tests {
|
||||
name, test := name, test
|
||||
t.Run(name, func(t *testing.T) {
|
||||
sp := NewStatefulProbe()
|
||||
test.probeTransformation(t, sp)
|
||||
probeFunc := sp.GetProbeFunction()
|
||||
if (probeFunc() != nil) != test.expectedError {
|
||||
t.Error("result not as expected")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
23
types.go
23
types.go
@ -1,23 +0,0 @@
|
||||
package kubeprobes
|
||||
|
||||
import "net/http"
|
||||
|
||||
// Kubeprobes represents liveness & readiness probes handler.
|
||||
type Kubeprobes interface {
|
||||
http.Handler
|
||||
|
||||
// LivenessHandler returns [http.Handler] for liveness probes.
|
||||
LivenessHandler() http.Handler
|
||||
// ReadinessHandler returns [http.Handler] for readiness probes.
|
||||
ReadinessHandler() http.Handler
|
||||
}
|
||||
|
||||
// Option represents a [Kubeprobes] constructor option.
|
||||
type Option interface {
|
||||
apply(kp *kubeprobes)
|
||||
}
|
||||
|
||||
// ProbeFunction is a function that determines whether
|
||||
// the given metric may be marked as correctly functioning.
|
||||
// It not, the error should be returned.
|
||||
type ProbeFunction func() error
|
Loading…
x
Reference in New Issue
Block a user