Python3.7,AT,PDU编码,并通过串口发送短信。

阅读: 评论:0

Python3.7,AT,PDU编码,并通过串口发送短信。

Python3.7,AT,PDU编码,并通过串口发送短信。

PDU Mode被所有手机支持,可以使用任何字符集,这也是手机默认的编码方式。网上C语言的编码实例很多,基于python写的不多,特基于python3.7实现AT指令发送短信,以及PDU编码,串口操作。
PDU编码规则网上资源很多,在线编码地址:
在线PDU
可以用于比对调试。
直接上代码。
# 处理收件人 “8613800280500” -> “683108200805F0”

def deal_iphonenum(demo):  # 收件人 "8613800280500" -> "683108200805F0"if len(demo) % 2 != 0:demo = demo + "F"else:demo = demolistdemo = []for x in range(len(demo)): listdemo.append(demo[x])qi = listdemo[::2]ou = listdemo[1::2]end = []if len(qi) == len(ou):  # 奇偶互换for i in range(len(qi)):end.append(ou[i])end.append(qi[i])return "".join(end)

中心号码 “8613800280500” -># “0891683108200805F0”


def deal_center_iphone_num(demo):# 中心号码 "8613800280500" -># "0891683108200805F0"if len(demo) % 2 != 0:demo = demo + "F"else:demo = demolistdemo = []for x in range(len(demo)): listdemo.append(demo[x])qi = listdemo[::2]ou = listdemo[1::2]end = []if len(qi) == len(ou):  # 奇偶互换for i in range(len(qi)):end.append(ou[i])end.append(qi[i])return "0891"+"".join(end)

编码短信内容:

def expand_to_16(ucode): #ascii字符--16-bit编码for i in range(len(ucode)):if (len(ucode[i])==4):ucode[i] = ucode[i][0:2]+"00" + ucode[i][2:]return ucodedef msg_ucs2_encode(src):# 7-bit编码 用于发送普通的ASCII字符,# 8-bit编码 通常用于发送数据消息,# UCS2编码 用于发送Unicode字符。#  UCS2编码  中文:你好-> 4f60597d#  7-bit  hello -> 00680065006c006c006f  英文-> ascii -> 转hex,16进制。补0#  8-bit  1234 -  > 0031003200330034decoder = decoder("utf-8")ucs2 = de())src_len = len(ucs2[0])ucode = []result = ""for i in range(src_len):ucode.append(hex(ord(ucs2[0][i])))ucode = expand_to_16(ucode)for item in ucode:result = result+item[2:]return result

‘’’ 产生PDU 编码’’’

def creatr_Pdu(des, smsc, content):# 收件人 中心号码 内容result = ""type_of_address = "91"  # 国际91,中国小灵通“91”tp_mti = "01"  #tp_mr = "00"  #tp_pid = "00"  # 默认为普通GSM类型,即点到点方式des_len = "0d"  # 目标地址数字个数 共13个十进制数(不包括91和‘F’)alphabet_size = "08"  # 默认用16-bit(7/8/16)编码result += deal_center_iphone_num(smsc)  # 加中心站号码result += tp_mtiresult += tp_mrresult += des_lenresult += type_of_addressdes = deal_iphonenum(des)  # 加收件人号码result += desresult += tp_pidresult += alphabet_size# logger.info("con_len:"+(hex(int(len(content) / 2))[2:]).zfill(2))result += (hex(int(len(content) / 2))[2:]).zfill(2) # !转16进制去除0x 以后补0 0xE->E->0Eresult += contentreturn result.upper()

测试一下:

rev_iphone = "8615928999999"
center_iphone = "8613800280500" # 成都中心号码
masg_content = "坝上雨量缺数,超过2分钟,水位:422.0m/s,最新来数时间:2021-04-15 08:00:00"
masg_content = "1234"
logger.info(creatr_Pdu(rev_iphone, center_iphone, msg_ucs2_encode(masg_content)))

**

串口操作及AT指令发送短信

**

在这里插入代码片

def OpenPort(portx, bps):try:ser = serial.Serial(portx, bps)if (True == ser.is_open):logger.warning("串口" + str(portx) + "打开成功!")return serexcept Exception as (e)def ColsePort(ser):ser.close()logger.warning((ser.name) + "已经关闭!")# 写数据
def DWritePort(ser, data):result = ser.write(data)  # 写数据logger.info(ser)logger.info("Led Write %s(%d)" % (data.hex(), result))return result# 读数据
# 十六进制显示
def hexShow(argv):try:result = ''hLen = len(argv)for i in range(hLen):hvol = argv[i]hhex = '%02x' % hvolresult += hhex+' 'logger.info('Led Read:%s', result)return resultexcept Exception as e:print("---异常---:", e)def DReadPort(ser, way):# 循环接收数据,此为死循环,用线程实现readstr = ""while True:try:# 一个字节一个字节的接收if ser.in_waiting:if (way == 0):for i in range(ser.in_waiting):print("接收ascii数据:" + str(ser.Read_Size(1)))data1 = ser.Read_Size(1).hex()  # 转为十六进制data2 = int(data1, 16)  # 转为十进制if (data2 == "exit"):  # 退出标志breakelse:print("收到数据十六进制:" + data1 + " 收到数据十进制:" + str(data2))if (way == 1):# 整体接收data = ad_all()  # 方式二if (data == "exit"):  # 退出标志breakelse:print("接收ascii数据:", data)except Exception as e:print("异常报错:", e)def deal_iphonenum(demo):  # "8615928789939"if len(demo) % 2 != 0:demo = demo + "F"else:demo = demolistdemo = []for x in range(len(demo)): listdemo.append(demo[x])qi = listdemo[::2]ou = listdemo[1::2]end = []if len(qi) == len(ou):  # 奇偶互换for i in range(len(qi)):end.append(ou[i])end.append(qi[i])return "".join(end)# 解码收件号码
def edncode_iphoneNum(msg):try:iphone = msg[26:38]# logger.info(iphone)listdemo = []for x in range(len(iphone)): listdemo.append(iphone[x])qi = listdemo[::2]ou = listdemo[1::2]end = []for i in range(len(qi)):end.append(ou[i])end.append(qi[i])return "".join(end).replace("F", "").replace("f", "")("获取收件号码失败!")def deal_addr_iphonenum(demo):  # "8613800280500"if len(demo) % 2 != 0:demo = demo + "F"else:demo = demolistdemo = []for x in range(len(demo)): listdemo.append(demo[x])qi = listdemo[::2]  # [1 3]ou = listdemo[1::2]  # [2, 4]end = []if len(qi) == len(ou):  # 奇偶互换for i in range(len(qi)):end.append(ou[i])end.append(qi[i])("号码有误")  # 91 国际 81 小灵通return str(hex(int(len("91" + "".join(end)) / 2)).replace("0x", "").zfill(2)) + "91" + "".join(end)# 中文转unicode  # 工作愉快 "u5de5u4f5cu6109u5febuff01"
def deal_msg(demo):return str(json.dumps(demo)).replace("\u", "").upper()def read_lines_from_serialPort(ser):array = []while 1:# ret = adline()# ret = ad(ser.in_waiting)ret = adline()ret = ret.decode().strip()if len(ret) == 0 or ret is None:time.sleep(0.5)continueelse:array.append(ret)logger.info(ret)dswith("OK"):dswith(">"):breakif ret.startswith("+CREG"):breakreturn array# 获取AT指令返回信息
def get_return_data_Msg(ser,str):# ser = serial.Serial('COM3', 115200)logger.info(ser.is_open)ser.flushInput()# at+cmgf?r:0 pdu  1 txtlogger.info("AT指令:" + str)ser.de())while True:count = ser.inWaiting()  # 获取串口缓冲区数据if count != 0:recv = ad(ser.in_waiting).decode().strip().replace("r", "").replace("n", "")return recvtime.sleep(0.2)# 获取返回信息
def get_data_Msg(ser1):time_now = w()while True:count = ser1.inWaiting()  # 获取串口缓冲区数据if (w() - time_now).total_seconds() > 3:return "TIME_OUT"breakif count != 0:recv = ad(ser1.in_waiting).decode().strip().replace("r", "").replace("n", "")return recvbreaktime.sleep(0.2)# CMS是 短信中心SMSC的返回错误
# at+cmgf?r:0 pdu  1 txt
# AT+CMGD
# AT+CMGL =4 列出所有信息
@logger.catch
def deleteMsg(serList):ser = serial.Serial(serList['portName'], serList['baudRate'])logger.info("串口开启状态:" + str(ser.is_open))# logger.info(ser.is_open)ser.flushInput()steDemo = "AT+CMGD=1,4r"ser.de())while True:count = ser.inWaiting()  # 获取串口缓冲区数据if count != 0:recv = ad(ser.in_waiting).decode().strip().replace("r", "").replace("n", "")if recv == "OK":logger.warning("短信删除成功!")ser.close()("短信删除失败!")ser.close()breaktime.sleep(0.2)# flag = ser.is_open# global isBusy# if flag == False:#     ('delete_串口未打开!')#     return 0# if isBusy:#     ('delete_串口繁忙!')#     return 0## try:#     cmd_cmgd = "AT+CMGD=1,4r"#     ser.write(de())#     ser.flushInput()#     arr2 = read_lines_from_serialPort(ser)#     for s2 in arr2:#         logger.warning(s2)##     isBusy = True#     logger.warning("++ delete ++")#     while 1:#         delMode = "0"#         cmd = "at+cmgf?r"#         ser.de())#         arr = read_lines_from_serialPort(ser)#         for s1 in arr:#             if s1.startswith("+CMGF:"):#                 fmt = s1.split(":")[1].strip()#                 if fmt == "1":#                     delMode = "ALL"#         ##########################################################         logger.warning("delMode = " + delMode)#         listTemp = []#         cmd_cmgl = "at+cmgl=4r"#         if delMode == "ALL":#             cmd_cmgl = 'at+cmgl="ALL"r'##         logger.warning(cmd_cmgl)#         ser.write(de())#         # ser.flush()#         arr2 = read_lines_from_serialPort(ser)#         for s2 in arr2:#             if s2.startswith("+CMGL:"):#                 sms_index = s2.split(":")[1].strip().split(",")[0]#                 listTemp.append(sms_index)#         if len(listTemp) > 0:#             for idx in listTemp:#                 cmd_cmgd = "AT+CMGD=%sr" % (idx)#                 ser.write(de())#                 # ser.flushInput()#             logger.info("已经删除短信:" + str(len(listTemp)) + "条")#         else:#             logger.info("无信息可删除!")#             break#         isBusy = False# except Exception as ex:#     ("delete fail" + ex)# finally:#     isBusy = False#     pass@logger.catch
def send(serList, pdumsg, textMsg):try:ser = serial.Serial(serList['portName'], serList['baudRate'])if ser.is_open == True:logger.info("串口开启正常")smgdLen = int((int(len(pdumsg)) - 18) / 2)cmd_cmgs = "at+cmgs=" + str(smgdLen)logger.info(cmd_cmgs)ser.write((cmd_cmgs + "r").encode())time.sleep(1)ser.de())time.sleep(1)  # 'x1Ar'ser.write('x1Ar'.encode())time.sleep(1)endDemo = get_data_Msg(ser)logger.info(endDemo)if "ERROR" not in endDemo or endDemo is "TIME_OUT":logger.warning("发送成功!")logger.info("收信人:" + edncode_iphoneNum(pdumsg))logger.info("短信内容:" + de('latin-1').decode('gbk'))ser.close()return 1else:return 0ser.close()("串口未打开"+str(ser.is_open))except Exception as ("send_短信发送异常!" + ex)return 0@logger.catch
def getDblink(ip, user, pas, dbname):try:conn = t(str(ip), str(user), str(pas), str(dbname), login_timeout=5)if conn:logger.warning("LINK DB OK !")return ("LINK DB ERROR!")except Exception as ("LINK DB ERROR!")(e)finally:pass@logger.catch
def updateMagstate(conn, pdumsg, time_limit):try:cur = conn.cursor()sqlUpdate = "update SYS_SMS_PDU set is_send=1 where pdu_msg= '%s' and create_time > '%s' " % (pdumsg, time_ute(sqlUpdate)connmit()if (wcount) > 0:logger.info("update table success")("update error")#7-bit编码用于发送普通的ASCII字符,
# 8-bit编码通常用于发送数据消息,
# UCS2编码用于发送Unicode字符。
@logger.catch
def getMsg(serList,conn, maxmum):logger.info("开始 获取 需要 发送的 msg")global douplerecordcur = conn.cursor()time_inter_now = w()offset_one = datetime.timedelta(hours=pduInterval)time_pdu_limit = (time_inter_now - offset_one).strftime('%Y-%m-%d %H:%M:%S')sqlSelect = "select x.warning_msg,x.pdu_msg from SYS_SMS_PDU x where x.is_send=0 and create_time > '%s';" % (time_pdu_ute(sqlSelect)resulte = cur.fetchall()count = douplerecord['count']logger.warning(len(resulte))if len(resulte) > 0:logger.warning("需要发送短信条数:" + str(len(resulte)))if int(len(resulte) + douplerecord['count']) >= ("今日信息已达最大限度,不发送")else:logger.info("今日信息未达最大限度,开始发送")for i in range(len(resulte)):ifSuccess = send(serList,resulte[i][1], resulte[i][0])if ifSuccess == 1:updateMagstate(coon, resulte[i][1], time_pdu_limit) # update 0-1count = count + 1 # 发1 加 1douplerecord['count'] = counttimeTemp = w().strftime('%Y-%m-%d')if timeTemp > douplerecord['time']: # 时间来到明天,修改内存时间、count 置0,,douplerecord['time'] = timeTempmodify_config('dbconfig.json', timeTemp, count)douplerecord['count'] = 1logger.warning(douplerecord['time']+"  已发短信数目:  " + str(douplerecord['count']))("短信发送异常——getMsg!")# logger.info(" 发送完成! ")logger.info("finish . waiting  next  r")else:logger.warning("无短信需要发送")timeTemp = w().strftime('%Y-%m-%d')if timeTemp > douplerecord['time']:  # 时间来到明天,修改内存时间、count 置0,,douplerecord['time'] = timeTempmodify_config('dbconfig.json', douplerecord['time'], count)logger.info("finish . waiting  next  start ........ r")@logger.catch
def modify_config(input_json_file, date, count):with open(input_json_file, "r", encoding='utf-8-sig') as load_f:data = json.load(load_f)data["daynum"]['time'] = datedata["daynum"]['count'] = countwith open(input_json_file, "w") as dump_f:json.dump(data, dump_f)logger.info("修改 config 文件 成功")def is_chinese(s):"""check the string have chinese or not"""rt = Falseif s >= u"u4e00" and s <= u"u9fa6":rt = Truereturn rt

本文发布于:2024-01-31 13:12:58,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170667797928775.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:串口   发送短信   PDU
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23