基于loki-ruler组件实现日志报警

loki有个ruler组件负责日志的报警,会轮训调用loki api执行logql语句判断规则是否触发,loki-ruler将产生的报警时间发送到alertmanager中,我们平台基于loki-ruler封装了对日志报警的增删查

安装

如果是使用helm安装的loki,需要在values文件中将ruler组件的enable改为true

API

平台侧使用api完成loki告警规则的增删查

Loki地址: 192.168.1.100:3000

查看告警规则

GET	192.168.1.100:3000/loki/api/v1/rules

返回体是yaml格式的,如下:

cloud:
    - name: cloud-errordemo
      rules:
          - alert: cloud-errordemo
            expr: 'sum(count_over_time({job="kubemanage/errordemo",region="cloud"} |~ "(?i)error"[1m])) by (namespace,pod) > 0 '
            for: 1s
            labels:
                kubemanage_alertname: cloud-errordemo
                kubemanage_alertnamespace: kubemanage
                kubemanage_alerttype: "2"
                kubemanage_template: 日志报错error
                kubemanage_tenant: "1"
                namespace: monitoring
            annotations:
                App: errordemo
                Description: 实例:{{$labels}}当前值{{$value}}
                Level: L1
                Namespace: kubemanage
                Region: cloud
                Resources: '{"app":"errordemo","namespace":"kubemanage","region":"cloud"}'
                Template: 日志报错error

创建告警规则

POST 192.168.1.100:3000/loki/api/v1/rules/cloud/cloud-errordemo

在平台侧我们以集群标签作为loki的组名,告警策略名作为名称

删除单条告警规则

DELETE 192.168.1.100:3000/loki/api/v1/rules/cloud/cloud-errordemo

返回体Json结构:

{
    "status": "success",
    "data": null,
    "errorType": "",
    "error": ""
}

源码分析

封装的http client

package client

import (
	"bytes"
	"context"
	"encoding/json"
	"io"
	"io/ioutil"
	"net/http"
	"time"

	"github.com/noovertime7/kubemanage/pkg/log"
)

type RESTClient interface {
	GET(ctx context.Context, url string) ([]byte, error)
	POSTWithJson(ctx context.Context, url string, body interface{}) ([]byte, error)
	GETBind(ctx context.Context, url string, bind interface{}) error
	POSTWithJsonBind(ctx context.Context, url string, body interface{}, bind interface{}) error
	Do(ctx context.Context, method, url string, body []byte, ops ...RequestOption) (*http.Response, error)
}

var DefaultClient = &http.Client{Timeout: 3 * time.Second}

const OK = "OK"

// HttpStatus 检测状态码是否为200工具函数
var HttpStatus = func(resp *http.Response, code int) (bool, string) {
	defer resp.Body.Close()
	body, err := ioutil.ReadAll(resp.Body)
	if err != nil {
		return false, err.Error()
	}
	if resp.StatusCode != code {
		return false, string(body)
	}
	return true, "OK"
}

func NewRESTClient(client *http.Client) RESTClient {
	return &restClient{client: client}
}

type restClient struct {
	client *http.Client
}

func (r *restClient) GET(ctx context.Context, url string) ([]byte, error) {
	resp, err := r.client.Get(url)
	if err != nil {
		return nil, err
	}
	defer resp.Body.Close()
	body, err := ioutil.ReadAll(resp.Body)
	if err != nil {
		return nil, err
	}
	return body, nil
}

func (r *restClient) GETBind(ctx context.Context, url string, bind interface{}) error {
	data, err := r.GET(ctx, url)
	if err != nil {
		return err
	}

	err = json.Unmarshal(data, bind)
	if err != nil {
		return err
	}
	return nil
}

func (r *restClient) POSTWithJson(ctx context.Context, url string, body interface{}) ([]byte, error) {
	var bytesData []byte
	var err error

	if body != nil {
		bytesData, err = json.Marshal(body)
		if err != nil {
			return nil, err
		}
	}

	resp, err := r.client.Post(url, "application/json", bytes.NewReader(bytesData))
	if err != nil {
		return nil, err
	}
	defer resp.Body.Close()
	respBody, err := ioutil.ReadAll(resp.Body)
	if err != nil {
		return nil, err
	}
	return respBody, nil
}

func (r *restClient) POSTWithJsonBind(ctx context.Context, url string, body interface{}, bind interface{}) error {
	data, err := r.POSTWithJson(ctx, url, body)
	if err != nil {
		return err
	}

	err = json.Unmarshal(data, bind)
	if err != nil {
		return err
	}
	return nil
}

type RequestOption func(req *http.Request) *http.Request

var WithYamlHeader = func(req *http.Request) *http.Request {
	req.Header.Set("Content-Type", "application/yaml")
	return req
}

func (r *restClient) Do(ctx context.Context, method, url string, body []byte, ops ...RequestOption) (*http.Response, error) {
	req, err := http.NewRequestWithContext(ctx, method, url, io.NopCloser(bytes.NewReader(body)))
	if err != nil {
		return nil, err
	}

	log.Debugf("向[%s]发送[%s]请求", url, method)

	for _, op := range ops {
		op(req)
	}

	return r.client.Do(req)
}

继续封装使用http client调用

package handler

import (
	"context"
	"fmt"
	"io"
	"io/ioutil"
	"net/http"

	monitoringV1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
	"sigs.k8s.io/yaml"
    
	"github.com/noovertime7/kubemanage/cmd/app/config"
	"github.com/noovertime7/kubemanage/dao/model"
	"github.com/noovertime7/kubemanage/pkg/log"
	"github.com/noovertime7/kubemanage/runtime/client"
)

type Loki interface {
	SyncLokiRule(ctx context.Context, c client.RESTClient, tenantID string, rule *model.MonitorAlertStrategy, template []model.MonitorTemplate) error
	GetLokiRule(ctx context.Context, client client.RESTClient, url string) (*monitoringV1.RuleGroup, error)
	DeleteLokiRule(ctx context.Context, client client.RESTClient, region, ruleName string) error
}

type loki struct {
}

func NewLoki() Loki {
	return &loki{}
}

func (l *loki) SyncLokiRule(ctx context.Context, c client.RESTClient, tenantID string, rule *model.MonitorAlertStrategy, template []model.MonitorTemplate) error {
	//	生成模板
	rules := GenerateRuleGroup(rule, tenantID, template)
	data, err := yaml.Marshal(rules)
	if err != nil {
		return err
	}
	// 向loki发起请求
	uri := fmt.Sprintf("/loki/api/v1/rules/%s", rule.Resources.Get("region"))
	lokiUrl := fmt.Sprintf("%s%s", config.SysConfig.LogCenter.LokiUrl, uri)
	log.WithName("syncLokiRule").V(7).Info("向loki发起创建告警规则请求", "地址", lokiUrl)

	resp, err := c.Do(ctx, http.MethodPost, lokiUrl, data, client.WithYamlHeader)
	if err != nil {
		return err
	}

	if resp.StatusCode != 202 {
		defer resp.Body.Close()
		body, err := ioutil.ReadAll(resp.Body)
		if err != nil {
			return err
		}
		return fmt.Errorf("request error [%s]", string(body))
	}
	return nil
}

func (l *loki) GetLokiRule(ctx context.Context, client client.RESTClient, url string) (*monitoringV1.RuleGroup, error) {
	response, err := client.Do(ctx, http.MethodGet, url, nil)
	if err != nil {
		return nil, err
	}
	rule := &monitoringV1.RuleGroup{}
	byteData, err := io.ReadAll(response.Body)
	if err != nil {
		return nil, err
	}
	defer response.Body.Close()
	if err = yaml.Unmarshal(byteData, &rule); err != nil {
		return nil, err
	}
	return rule, nil
}

func (l *loki) DeleteLokiRule(ctx context.Context, client client.RESTClient, region, ruleName string) error {
	// 调用loki api删除报警规则
	uri := fmt.Sprintf("/loki/api/v1/rules/%s/%s", region, ruleName)
	lokiUrl := fmt.Sprintf("%s%s", config.SysConfig.LogCenter.LokiUrl, uri)
	_, err := client.Do(ctx, http.MethodDelete, lokiUrl, nil)
	return err
}