ElasticSearch索引的快照、清理策略脚本

阅读: 评论:0

ElasticSearch索引的快照、清理策略脚本

ElasticSearch索引的快照、清理策略脚本

一、简介

当使用ES存储接入的应用日志时,日志索引会日益增多。而真实的日志查询需求一般是要求半月可查,存储半年到一年。应用每天产生的日志普遍会存到对应ES中以当天日期命名的索引中。查询时根据需求,最多查询半月对应索引里的数据。至于超过半月以上的日志索引数据,可快照文件,存储到文件系统中。在有特殊场景需求时进行快照恢复进行查询。这样减小ES的索引压力,提高查询效率。

需求

将半月以上的日志索引快照成文件,存储到快照仓库中
删除已快照的日志索引
定时检测创建超过15天的索引并快照、清理
钉钉通知快照后删除的索引名称
脚本执行错误时告警

二、基于API的Shell脚本

1、Shell脚本

通过环境变量设置参数
脚本执行完成后发送钉钉通知,显示脚本涉及到的ES索引
集成Sentry告警,每当脚本执行出错时将时间发送至Sentry,再由Sentry进行邮件告警
可使用Linux cron工具或K8S上的cornjob定时每天早上1点执行该脚本

#!/bin/bash
export SENTRY_DSN=*****@sentry@example/28
eval "$(sentry-cli bash-hook)"elasticsearch_host=${ELASTICSEARCH_HOST:192.168.10.22}
elasticsearch_username=${ELASTICSEARCH_USERNAME:cronjob}
elasticsearch_password=${ELASTICSEARCH_PASSWORD:******}
elasticsearch_index_expiry_day=${ELASTICSEARCH_INDEX_EXPIRY_DAY:15}
elasticsearch_exclude_index=${ELASTICSEARCH_EXCLUDE_INDEX:.*}
elasticsearch_snapshots_repository=${ELASTICSEARCH_SNAPSHOTS_REPOSITORY:***}elasticsearch_index_expiry_sec=$((elasticsearch_index_expiry_day*86400))
elasticsearch_url="${elasticsearch_host}:9200"
allIndex=`curl -s -u ${elasticsearch_username}:${elasticsearch_password} -XGET "${elasticsearch_url}/_cat/indices/_all?h=index"`
excludeIndex=`curl -s -u ${elasticsearch_username}:${elasticsearch_password} -XGET "${elasticsearch_url}/_cat/indices/${elasticsearch_exclude_index}/?h=i"`
indices=`echo -e "$allIndexn""$excludeIndex" |sort -n |uniq -u`for i in $indices ;
docreatedateincludemesc=`curl -s -u ${elasticsearch_username}:${elasticsearch_password} -XGET "${elasticsearch_url}/_cat/indices/$i?h=cd"` ;createdate=$((createdateincludemesc/1000))currentdate=`date +%s`durationtime=$((currentdate-createdate)) ;if [ $durationtime -gt $elasticsearch_index_expiry_sec ] ;thensnapshotsIndices=$i"n"${snapshotsIndices}fi
donefor i in `echo -e $snapshotsIndices` ;
doif [ `curl -o /dev/null -w "%{http_code}n" -s -u ${elasticsearch_username}:${elasticsearch_password} -XPUT "${elasticsearch_url}/_snapshot/${elasticsearch_snapshots_repository}/%3C$i-%7Bnow%2Fd%7D%3E?wait_for_completion=true" -H 'Content-Type: application/json' -d'{"indices": "'$i'","ignore_unavailable": true,"include_global_state": false}'` = 200 ] ;thenif [ `curl -o /dev/null -w "%{http_code}n" -s -u ${elasticsearch_username}:${elasticsearch_password} -XDELETE "${elasticsearch_url}/$i"` = 200 ] ;thenecho -e "The Index $i t have been snapshoted to repository and deleted !" ;elseecho "$i failed to delete " ;fielseecho "$i failed to snapshot " ;fi
donecurl -s -o /dev/null '=*****' -H 'Content-Type: application/json' -d '{"msgtype": "text","text": {"content": "已成功将以下'"$elasticsearch_index_expiry_day"'天之前的索引进行了快照:n'"$snapshotsIndices"'"}}'

2、脚本部署执行

①Linux的Cronjob
echo "0 1 * * * /opt/es-index-snapshots.sh" > /etc/crontab
②Kubernetes的cronjob

1、构建Cronjob镜像

DockerfileFROM centos:7
RUN curl -sL / | bash
ADD ./es-index-snapshots.sh /usr/sbin/es-index-snapshots.sh
Entrypoint ["/bin/sh","-c"]
CMD ["/usr/sbin/es-index-snapshots.sh"]
docker build -t es-index-snapshots:v1 .
2、k8s资源声明文件
apiVersion: batch/v1beta1
kind: CronJob
metadata:name: es-index-snapshots-cronjobnamespace: logging
spec:concurrencyPolicy: AllowfailedJobsHistoryLimit: 1schedule: 0 1 * * *startingDeadlineSeconds: 200successfulJobsHistoryLimit: 3suspend: falsejobTemplate:spec:template:spec:containers:- env:- name: TZvalue: Asia/Shanghai- name: ELASTICSEARCH_HOSTvalue: *.logging.svc- name: ELASTICSEARCH_USERNAMEvalue: cronjob- name: ELASTICSEARCH_PASSWORDvalue: "***""- name: ELASTICSEARCH_INDEX_EXPIRY_DAYvalue: "15"- name: ELASTICSEARCH_EXCLUDE_INDEXvalue: .*- name: ELASTICSEARCH_SNAPSHOTS_REPOSITORYvalue: "***"image: es-index-snapshots:v1imagePullPolicy: Alwaysname: es-index-snapshots-cronjobresources:limits:cpu: 600mmemory: 800Mirequests:cpu: 300mmemory: 500MiterminationMessagePath: /dev/termination-logterminationMessagePolicy: FilednsPolicy: ClusterFirstimagePullSecrets:- name: harbor-secretsrestartPolicy: OnFailureschedulerName: default-schedulersecurityContext: {}terminationGracePeriodSeconds: 30
kubectl apply -l

三、基于 Python版本SDK的脚本

1、Python脚本

脚本依据索引的创建时间进行处理的。例如设置快照删除15天以前的索引,判断计算的期限是以索引的创建时间算起的15天
定时检测将指定期限以上的索引快照成文件,存储到快照仓库中,然后删除
钉钉通知进行处理的索引名称
脚本执行错误时进行Sentry告警

#!/usr/bin/python3
# -*- coding: UTF-8 -*-import json,time,requests,sentry_sdk
from elasticsearch import Elasticsearches = Elasticsearch(["192.168.10.60","192.168.10.70"],# es用户角色权限要求:集群权限:monitor、create_snapshot 索引权限:*(所有索引) monitor、delete_indexhttp_auth=('es用户名', 'es用户密码'),scheme="http",port=9200,http_compress=True
)app_index_retain_day=15
nginx_index_retain_day=15# Sentry DSN
sentry_sdk.init(dsn='*****:*****&#ample/12')# 钉钉机器人Token
dingding_webhook_token="*****"# 获取所有索引
def getAllIndex():return es.cat.indices('*', h='index,cd', format='json', s='index')# 将获取到的所有索引去除"."开头的、名字异常的或想排除的
def getExcludeSystemAndAberrantIndex():return list(filter(lambda x: (not ( x['index'].startswith('.') or '%{[app]}' in x['index'] or x['index'].startswith('gitlab-production') or x['index'].startswith('jaeger') )), getAllIndex()))# 获取应用日志索引
def getAppIndex():return list(filter(lambda x: ( not ('nginx' in x['index'] or 'mysql-slowlog' in x['index'] )), getExcludeSystemAndAberrantIndex()))# 获取Nginx日志索引
def getNginxIndex():return list(filter(lambda x: ( 'nginx' in x['index'] ), getExcludeSystemAndAberrantIndex()))# 获取MySQL慢日志索引
def getMysqlSlowQueryLogIndex():return list(filter(lambda x: ('mysql-slowlog' in x['index']), getAllIndex()))# Snapshots索引
def snapshotIndex(index):index_body = {"indices": index }print(index)return ate(body=index_body,repository='NAS-NFS-Snapshots-Repository', wait_for_completion='true', request_timeout=300, snapshot= index+'-snapshoted-'+ time.strftime('%m-%d') )# 删除索引
def deleteIndex(index):es.indices.delete(index=index)# 钉钉通知
def dingdingNotification(token,msg,day):url = "="+tokenheaders = { "Content-Type": "application/json", "Charset": "UTF-8" }# 构建请求数据,post请求data = {"msgtype": "text","text": {"content": msg+"n"},"at": {"isAtAll": 'true'}}if not requests.post(url, data=json.dumps(data), headers=headers) :print("发送钉钉通知失败!")sentry_sdk.capture_exception(Exception("发送钉钉通知失败!"))
# 将创建日志超过指定天数的日志索引快照到存储仓库中,然后删除
def snapshotAndDeleteAppIndex(type,day):if type == 'app' :snapshoted_deleted_app_indices=[]for i in getAppIndex():cts=time.time()if ( (cts - int(i["cd"])/1000) ) > day*86400 :if 'SUCCESS' in snapshotIndex(i["index"])['snapshot']["state"]:deleteIndex(i['index'])print(i['index']+ "已在ES中快照并删除!")snapshoted_deleted_app_indices.append(i['index'])else:print("应用日志索引:"+i['index']+"快照失败")sentry_sdk.capture_exception(Exception("应用日志索引:"+i['index']+"快照失败"))continueif snapshoted_deleted_app_indices :Notification_Context="[索引快照清理任务]n成功将以下"+str(day)+"天之前的应用日志索引进行了快照n"+"n".join(str(i) for i in  snapshoted_deleted_app_indices)dingdingNotification(dingding_webhook_token,Notification_Context,day)else:Notification_Context = "[索引快照清理任务]n没有超过"+ str(day)+"天的应用日志索引需要被快照删除!"dingdingNotification(dingding_webhook_token, Notification_Context, day)elif type == 'nginx' :snapshoted_deleted_nginx_indices = []for i in getNginxIndex():cts=time.time()if ( (cts - int(i["cd"])/1000) ) > day*86400 :if 'SUCCESS' in snapshotIndex(i["index"])['snapshot']["state"]:deleteIndex(i['index'])print(i['index'] + "已在ES中快照并删除!")snapshoted_deleted_nginx_indices.append(i['index'])else:print("Nginx日志索引:" + i['index'] + "快照失败")sentry_sdk.capture_exception(Exception("Nginx日志索引:" + i['index'] + "快照失败"))continueif snapshoted_deleted_nginx_indices:Notification_Context = "[索引快照清理任务]n成功将以下" + str(day) + "天之前的应用Nginx索引进行了快照n" + "n".join(str(i) for i in snapshoted_deleted_nginx_indices)dingdingNotification(dingding_webhook_token, Notification_Context, day)else:Notification_Context = "[索引快照清理任务]n没有超过" + str(day) + "天的应用Nginx日志索引需要被快照删除!"dingdingNotification(dingding_webhook_token, Notification_Context, day)elif type == 'mysqlslowlog' :snapshoted_deleted_mysqlslowlog_indices = []for i in getMysqlSlowQueryLogIndex():cts = time.time()if ((cts - int(i["cd"]) / 1000)) > day * 86400:if 'SUCCESS' in snapshotIndex(i["index"])['snapshot']["state"]:deleteIndex(i['index'])print(i['index'] + "已在ES中快照并删除!")snapshoted_deleted_mysqlslowlog_indices.append(i['index'])else:print("MySQL慢查询日志索引:" + i['index'] + "快照失败")sentry_sdk.capture_exception(Exception("MySQL慢查询日志索引:" + i['index'] + "快照失败"))continueif snapshoted_deleted_mysqlslowlog_indices :Notification_Context = "[索引快照清理任务]n成功将以下" + str(day) + "天之前的MySQL慢查询日志索引进行了快照n" + "n".join(str(i) for i in snapshoted_deleted_mysqlslowlog_indices)dingdingNotification(dingding_webhook_token, Notification_Context, day)else:Notification_Context = "[索引快照清理任务]n没有超过" + str(day) + "天的MySQL慢查询日志索引需要被快照删除!"dingdingNotification(dingding_webhook_token, Notification_Context, day)def main():print("====================="+time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())+"开始清理es中的索引=====================")print("................开始清理"+app_index_retain_day+"天前应用相关索引................")# 快照删除指定期限之前的应用索引snapshotAndDeleteAppIndex('app',app_index_retain_day)print("................开始清理"+nginx_index_retain_day+"天前nginx相关索引................")# 快照删除指定期限之前Nginx索引snapshotAndDeleteAppIndex('nginx',nginx_index_retain_day)exit(0)if __name__ == "__main__" :main()
2、
elasticsearch==7.0.0
pyyaml
requests
sentry_sdk
3、操作步骤

Python版本:3

默认清理策略

快照删除指定日期前的应用日记索引
快照删除指定日期前的应用Nginx日记索引 (索引名包含Nginx关键字的)
安装依赖

  pip3 install -

执行脚本


PYTHONIOENCODING=utf-8 python3 es-index-snapshots-clean.py

Crontab定时执行脚本:每天凌晨1点执行

  0 0 1 * * ? python3 es-index-snapshots-clean.py

四、官方的curator索引管理工具

.8/index.html

五、ES自带的ILM(index lifecycle management)功能

.17/index-lifecycle-management.html

本文发布于:2024-02-02 23:54:39,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170688927747300.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:快照   脚本   索引   策略   ElasticSearch
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23