Pyodps 脚本集合

阅读: 评论:0

Pyodps 脚本集合

Pyodps 脚本集合

ODPS日常python运维脚本集合

1、获取我们odps项目中的表详细信息,用于数据编目等业务。
2、获取我们项目中的表数据量等基本信息,用于数据对账等等。

Python表字段信息获取

Sql建表语句

create table if not exists `stg_table_comment_all_df` 
(`table_schema` string comment '表空间名称',`table_name` string comment '表名称',`table_comment` string comment '表注释',`col_name` string comment '列名称',`col_type` string comment '列类型',`col_comment` string comment '列注释'
)
comment 'odsp_表字段信息获取'
partitioned by 
(dt string comment '分区'
)
lifecycle 30
;

代码示例

# -*- coding: utf-8 -*-
from odps import ODPS
import datetime# python3 将下面5行删除即可
import sys
import importlib
# 防止中文注释乱码
load(sys)
sys.setdefaultencoding("utf8")# odps 项目定义
o = ODPS('MDCAy***yMiN', 'aV2*****CpJS','MC_CY_YXGL_SOURCE_****','')
# 定义表名筛选的前后缀
dt_str = (w() + datetime.timedelta(days=-1)).strftime('%Y%m%d')
ODS_PREFIX = ''
wd = []
# 定义项目空间 多个空间就写多个
project = ['mc_cy_yxgl_source_01', 'mc_cy_yxgl_dwd_01']
# 循环我们的项目空间一个一个来
for pro in project:ods_table_list = o.list_tables(pro)for t in ods_table_list:table_name = t.name# print (table_name)# 获取表注释table_comment = tmentcols = lumnsfor col in cols:col_name = col.namecol_type = pecol_comment = colment# 将表结构写入结果表wd.append([pro, table_name, table_comment, col_name, str(col_type), col_comment])sta_table = o.get_table("stg_table_comment_all_df")
sta_table.delete_partition('dt=%s' % dt_str, if_exists=True)
with sta_table.open_writer(partition=('dt=%s' % dt_str), create_partition=True) as writer:writer.write(wd)

Python数据量获取脚本

Sql建表语句

CREATE TABLE IF NOT EXISTS stg_table_statistics_all_df
(table_schema              STRING COMMENT '表空间名称',table_name                STRING COMMENT '表名',partition_name            STRING COMMENT '最新分区',chinese_name              STRING COMMENT '中文表名',column_count              BIGINT COMMENT '字段数量',column_comment_null_count BIGINT COMMENT '字段注释缺失数量',pt_count                  BIGINT COMMENT '分区数量',data_count                BIGINT COMMENT '最新分区数据量',star_time                 STRING COMMENT '开始时间',end_time                  STRING COMMENT '结束时间'
) 
COMMENT '获取数据表情况统计'
PARTITIONED BY
(dt                        STRING
)
LIFECYCLE 180;

代码示例

# -*- coding: utf-8 -*-
from odps import ODPS
import datetime
import re#python3 将下面5行删除即可
import sys
import importlib
# 防止中文注释乱码
load(sys)
sys.setdefaultencoding("utf8")# odps 项目定义
o = ODPS('MDCAy***PyMiN', 'aV2zYceNK8i****vzmRCpJS','MC_CY_YXGL_SOURCE_****','')
# 控制分区
dt_str = (w() + datetime.timedelta(days=-1)).strftime('%Y%m%d')
# 控制数据过滤
rid_list = []
# 正则控制 数据过滤
pattern = ''
# 结果数组
wd = []
# 定义项目空间 多个空间就写多个
project = ['mc_cy_yxgl_source_01', 'mc_cy_yxgl_dwd_01', 'mc_cy_yxgl_dws_01', 'mc_cy_yxgl_dwb_01','mc_cy_yxgl_yqfk']
# 循环我们的项目空间一个一个来
for pro in project:ods_table_list = o.list_tables(pro)for t in ods_table_list:table_name = t.name# print (table_name)# 过滤非业务表if not re.match(pattern, table_name):# print (table_name)continue# 过滤利旧数据表if table_name in rid_list:continuechinese_name = tmentcs = [c for c in lumns]# 字段数量column_count = len(cs)# 字段注释缺失数量column_comment_null_count = 0for c in cs:if cment == '' or cment == 'null':column_comment_null_count += 1# 执行SQLcnt_sql = ''# 分区new_pt = ''#开始时间star_time = w().strftime("%Y-%m-%d %H:%M:%S")# 拿出我们没用分区的表if len(t.schema.partitions) == 0:# 拼接出我们要执行的sqlcnt_sql = "select count(1) from %s " % (pro + '.' + table_name)# 拿出有分区的数据表else:pi = t.iterate_partitions()# 进入分区循环ps = [p for p in pi]pt_count = len(ps)#存在分区字段 但是并没有分区if (pt_count == 0):cnt_sql = "select 0"else:new_pt = str(ps[-1])if ',' in new_pt:  # 多级分区的情况new_pt = place(',', ' and ')cnt_sql = "select count(1) from %s where %s" % (pro + '.' + table_name, new_pt)else : # 单级分区pt_count = 1cnt_sql = "select count(1) from %s " % (pro + '.' + table_name)ute_sql(cnt_sql).open_reader() as reader:data_count = reader[0][0]end_time = w().strftime("%Y-%m-%d %H:%M:%S")wd.append([pro, table_name, str(new_pt), chinese_name, column_count, column_comment_null_count,pt_count, data_count,star_time,end_time])sta_table = o.get_table("stg_table_statistics_all_df")
sta_table.delete_partition('dt=%s' % dt_str, if_exists=True)
with sta_table.open_writer(partition=('dt=%s' % dt_str), create_partition=True) as writer:writer.write(wd)

Pyodps 获取项目上下游信息

Sql建表语句

create table if not exists stg_meta_instances_df 
(ins_name string comment '实例名',start_tm datetime comment '开始时间',end_tm datetime comment '结束时间',cost_tm bigint comment '总耗时(秒)',status string comment '实例状态',ins_owner string comment '作者',tsk_name string comment '子任务',tbl_in string comment '输入表(以,分割)',tbl_out string comment '输出表(以,分割)',etl_tm datetime comment 'ETL时间'
)
comment 'OPDS血源信息表'
partitioned by 
(dt string comment '按日期分区'
)
lifecycle 30
;
#!/usr/local/anaconda3/bin/python3
# -*- coding: utf-8 -*-
# ###########################################################################################
# 执行环境:分析服务器
# 脚本: get_instance_meta.py
# 调度: 每日早6点调度
# 日志: get_instance_meta.log
# ###########################################################################################import os
import re
from datetime import datetime, date, time, timedelta
from odps import ODPS
dels import Schema, Column, Partitionstart_tm = w()
today_min = day(), time.min)
# cur_path = os.path.split(alpath(__file__))[0]to_table = 'stg_meta_instances_df'
odps = ODPS('MDCAy5vFzEDPyMiN', 'aV2zYceNK8iNrn31VXZPH9vzmRCpJS','MC_CY_YXGL_SOURCE_01','')columns = [Column(name='ins_name', type='string', comment='实例名'),Column(name='start_tm', type='datetime', comment='开始时间'),Column(name='end_tm', type='datetime', comment='结束时间'),Column(name='cost_tm', type='bigint', comment='总耗时(秒)'),Column(name='status', type='string', comment='实例状态'),Column(name='ins_owner', type='string', comment='作者'),Column(name='tsk_name', type='string', comment='子任务'),Column(name='tbl_in', type='string', comment='输入表(以,分割)'),Column(name='tbl_out', type='string', comment='输出表(以,分割)'),Column(name='etl_tm', type='datetime', comment='ETL时间')]
partitions = [Partition(name='pt', type='string', comment='按日期分区')]
schema = Schema(columns=columns, partitions=partitions)records = []
try:for ins in odps.list_instances(start_time=today_min,end_time=start_tm,only_owner=False,status='Terminated'):tsk_name_filter = [re.match('console_query_task', tsk) for tsk _task_names()]try:tsk_output_filter = [_task_summary(tsk) if _task_summary(tsk)_task_summary(tsk).get('Outputs')for tsk _task_names()]except:continueelse:pass# 这里过滤了没有输入表、输出表的实例。这段代码初衷就是提取表间依赖关系,所以没有考虑所有实例if ins.is_successful() and any(tsk_name_filter) and any(tsk_output_filter):start_time = ins.start_time + timedelta(hours=8)end_time = d_time + timedelta(hours=8)tbl_in = set()tbl_out = set()for tsk _task_names():smy = _task_summary(tsk)tbl_in.update([re.match(r'^[w].*.([w]+).*$', key).group(1) for key in smy['Inputs'].keys()])tbl_out.update([re.match(r'^[w].*.([w]+).*$', key).group(1) for key in smy['Outputs'].keys()])records.append([ins.name,start_time.strftime('%Y-%m-%d %H:%M:%S'),end_time.strftime('%Y-%m-%d %H:%M:%S'),(end_time - start_time).seconds,ins.status.value.lower(),ins.owner.split(':')[-1],','._task_names()) _task_names() else None,','.join(tbl_in) if tbl_in else None,','.join(tbl_out) if tbl_out else w().strftime('%Y-%m-%d %H:%M:%S')])               partition = '%s=%s' % (partitions[0].name, start_tm.strftime('%Y%m%d'))to_tbl = ate_table(to_table, schema, if_not_exists=True)to_tbl.delete_partition(partition, if_exists=True)odps.write_table(to_table, records, partition=partition, create_partition=True)except:status = 'failed'n = 0
else:status = 'succeed'n = len(records)# end_tm = w().strftime('%Y-%m-%d %H:%M:%S')
# log = {'status': status, 'n': n, 'start': start_tm, 'end': end_tm}
# f = open(os.path.join(cur_path, 'get_field_meta.log'), 'a')
# f.write("Update {status} with {n} instances from {start} to {end}n".format(**log))
# f.close()

Python定期备份Mysql

代码示例

#!/usr/bin/python
# -*- coding: UTF-8 -*-# 先安装PyMySQL模块 pip install PyMySQL# 忽略产生的警告是信息
import warnings
warnings.filterwarnings('ignore')# pyMysql CPython>= 2.6 or >= 3.3
import pymysql
# 导入 os 这个模块,因为要在 shell 中执行
import os
import timemysql_host = "127.0.0.1"
mysql_user = "root"
mysql_pwd = "root"
mysql_port = 3306
mysql_charset = "utf8"
new_date = time.strftime("%Y%m%d%H%M%S")
back_path = "/data/mysql_back/"
out_time = 7*24*60*60     # 指定多少秒前的数据删除try:if(not(ists(back_path))):os.makedirs(back_path)# 获取一个数据库连接,注意如果是UTF-8类型的,需要制定数据库conn = t(host=mysql_host,            # 数据库地址user=mysql_user,                 # 数据库用户名passwd=mysql_pwd,               # 数据库密码db='mysql',            # 数据名port=mysql_port,                   # 数据库访问端口charset=mysql_charset               # 数据库编码格式)cur = conn.cursor()              # 获取一个游标ute('show databases')    # 查询出所有数据库data = cur.fetchall()            # 查询出来,并赋值 datafor db_names in data:for db_name in db_names:if(db_name=='information_schema' or db_name=='performance_schema' or db_name=='mysql'):continueif(not(ists(back_path+db_name))):os.makedirs(back_path+db_name)path = back_path+db_name+"/"+new_date+".sql"   # 数据库备份路径os.system("mysqldump -h%s -u%s -p%s %s > %s" % (mysql_host, mysql_user, mysql_pwd, db_name, path))old_time = time.strftime("%Y%m%d",ime(time.time()-out_time))os.system("rm -f %s*.sql" % (back_path+db_name+"/"+old_time))cur.close()                      # 关闭游标conn.close()                     # 释放数据库资源except Exception: print("查询失败")

执行操作

# 安装下必要库 环境要求
yum -y install gcc gcc-c++ make zlib* openssl openssl-devel openssl-static -y
yum install python-setuptools -y
# 或者 yum install python3-setuptools -y
easy_install pip
# 或者 easy_install3 pip
# 安装PyMySQL
pip install PyMySQL/data/shell/mysql_back.py 执行脚本
#添加Linux定时任务命令
crontab -e
#进去添加下边代码
*/15 * * * * /data/shell/mysql_back.py > /data/shell/logs.log 2>&1 &
#定时时间格式    脚本路径/脚本
#:wq保存退出即可

本文发布于:2024-01-28 02:25:06,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/17063799124109.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:脚本   Pyodps
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23