基于loki-ruler组件实现日志报警
loki有个ruler组件负责日志的报警,会轮训调用loki api执行logql语句判断规则是否触发,loki-ruler将产生的报警时间发送到alertmanager中,我们平台基于loki-ruler封装了对日志报警的增删查
安装
如果是使用helm安装的loki,需要在values文件中将ruler组件的enable改为true
API
平台侧使用api完成loki告警规则的增删查
Loki地址: 192.168.1.100:3000
查看告警规则
GET 192.168.1.100:3000/loki/api/v1/rules
返回体是yaml格式的,如下:
cloud:
- name: cloud-errordemo
rules:
- alert: cloud-errordemo
expr: 'sum(count_over_time({job="kubemanage/errordemo",region="cloud"} |~ "(?i)error"[1m])) by (namespace,pod) > 0 '
for: 1s
labels:
kubemanage_alertname: cloud-errordemo
kubemanage_alertnamespace: kubemanage
kubemanage_alerttype: "2"
kubemanage_template: 日志报错error
kubemanage_tenant: "1"
namespace: monitoring
annotations:
App: errordemo
Description: 实例:{{$labels}}当前值{{$value}}
Level: L1
Namespace: kubemanage
Region: cloud
Resources: '{"app":"errordemo","namespace":"kubemanage","region":"cloud"}'
Template: 日志报错error
创建告警规则
POST 192.168.1.100:3000/loki/api/v1/rules/cloud/cloud-errordemo
在平台侧我们以集群标签作为loki的组名,告警策略名作为名称
删除单条告警规则
DELETE 192.168.1.100:3000/loki/api/v1/rules/cloud/cloud-errordemo
返回体Json结构:
{
"status": "success",
"data": null,
"errorType": "",
"error": ""
}
源码分析
封装的http client
package client
import (
"bytes"
"context"
"encoding/json"
"io"
"io/ioutil"
"net/http"
"time"
"github.com/noovertime7/kubemanage/pkg/log"
)
type RESTClient interface {
GET(ctx context.Context, url string) ([]byte, error)
POSTWithJson(ctx context.Context, url string, body interface{}) ([]byte, error)
GETBind(ctx context.Context, url string, bind interface{}) error
POSTWithJsonBind(ctx context.Context, url string, body interface{}, bind interface{}) error
Do(ctx context.Context, method, url string, body []byte, ops ...RequestOption) (*http.Response, error)
}
var DefaultClient = &http.Client{Timeout: 3 * time.Second}
const OK = "OK"
// HttpStatus 检测状态码是否为200工具函数
var HttpStatus = func(resp *http.Response, code int) (bool, string) {
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return false, err.Error()
}
if resp.StatusCode != code {
return false, string(body)
}
return true, "OK"
}
func NewRESTClient(client *http.Client) RESTClient {
return &restClient{client: client}
}
type restClient struct {
client *http.Client
}
func (r *restClient) GET(ctx context.Context, url string) ([]byte, error) {
resp, err := r.client.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return body, nil
}
func (r *restClient) GETBind(ctx context.Context, url string, bind interface{}) error {
data, err := r.GET(ctx, url)
if err != nil {
return err
}
err = json.Unmarshal(data, bind)
if err != nil {
return err
}
return nil
}
func (r *restClient) POSTWithJson(ctx context.Context, url string, body interface{}) ([]byte, error) {
var bytesData []byte
var err error
if body != nil {
bytesData, err = json.Marshal(body)
if err != nil {
return nil, err
}
}
resp, err := r.client.Post(url, "application/json", bytes.NewReader(bytesData))
if err != nil {
return nil, err
}
defer resp.Body.Close()
respBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return respBody, nil
}
func (r *restClient) POSTWithJsonBind(ctx context.Context, url string, body interface{}, bind interface{}) error {
data, err := r.POSTWithJson(ctx, url, body)
if err != nil {
return err
}
err = json.Unmarshal(data, bind)
if err != nil {
return err
}
return nil
}
type RequestOption func(req *http.Request) *http.Request
var WithYamlHeader = func(req *http.Request) *http.Request {
req.Header.Set("Content-Type", "application/yaml")
return req
}
func (r *restClient) Do(ctx context.Context, method, url string, body []byte, ops ...RequestOption) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, method, url, io.NopCloser(bytes.NewReader(body)))
if err != nil {
return nil, err
}
log.Debugf("向[%s]发送[%s]请求", url, method)
for _, op := range ops {
op(req)
}
return r.client.Do(req)
}
继续封装使用http client调用
package handler
import (
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
monitoringV1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
"sigs.k8s.io/yaml"
"github.com/noovertime7/kubemanage/cmd/app/config"
"github.com/noovertime7/kubemanage/dao/model"
"github.com/noovertime7/kubemanage/pkg/log"
"github.com/noovertime7/kubemanage/runtime/client"
)
type Loki interface {
SyncLokiRule(ctx context.Context, c client.RESTClient, tenantID string, rule *model.MonitorAlertStrategy, template []model.MonitorTemplate) error
GetLokiRule(ctx context.Context, client client.RESTClient, url string) (*monitoringV1.RuleGroup, error)
DeleteLokiRule(ctx context.Context, client client.RESTClient, region, ruleName string) error
}
type loki struct {
}
func NewLoki() Loki {
return &loki{}
}
func (l *loki) SyncLokiRule(ctx context.Context, c client.RESTClient, tenantID string, rule *model.MonitorAlertStrategy, template []model.MonitorTemplate) error {
// 生成模板
rules := GenerateRuleGroup(rule, tenantID, template)
data, err := yaml.Marshal(rules)
if err != nil {
return err
}
// 向loki发起请求
uri := fmt.Sprintf("/loki/api/v1/rules/%s", rule.Resources.Get("region"))
lokiUrl := fmt.Sprintf("%s%s", config.SysConfig.LogCenter.LokiUrl, uri)
log.WithName("syncLokiRule").V(7).Info("向loki发起创建告警规则请求", "地址", lokiUrl)
resp, err := c.Do(ctx, http.MethodPost, lokiUrl, data, client.WithYamlHeader)
if err != nil {
return err
}
if resp.StatusCode != 202 {
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
return fmt.Errorf("request error [%s]", string(body))
}
return nil
}
func (l *loki) GetLokiRule(ctx context.Context, client client.RESTClient, url string) (*monitoringV1.RuleGroup, error) {
response, err := client.Do(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
}
rule := &monitoringV1.RuleGroup{}
byteData, err := io.ReadAll(response.Body)
if err != nil {
return nil, err
}
defer response.Body.Close()
if err = yaml.Unmarshal(byteData, &rule); err != nil {
return nil, err
}
return rule, nil
}
func (l *loki) DeleteLokiRule(ctx context.Context, client client.RESTClient, region, ruleName string) error {
// 调用loki api删除报警规则
uri := fmt.Sprintf("/loki/api/v1/rules/%s/%s", region, ruleName)
lokiUrl := fmt.Sprintf("%s%s", config.SysConfig.LogCenter.LokiUrl, uri)
_, err := client.Do(ctx, http.MethodDelete, lokiUrl, nil)
return err
}