PDU Mode被所有手机支持,可以使用任何字符集,这也是手机默认的编码方式。网上C语言的编码实例很多,基于python写的不多,特基于python3.7实现AT指令发送短信,以及PDU编码,串口操作。
PDU编码规则网上资源很多,在线编码地址:
在线PDU
可以用于比对调试。
直接上代码。
# 处理收件人 “8613800280500” -> “683108200805F0”
def deal_iphonenum(demo): # 收件人 "8613800280500" -> "683108200805F0"if len(demo) % 2 != 0:demo = demo + "F"else:demo = demolistdemo = []for x in range(len(demo)): listdemo.append(demo[x])qi = listdemo[::2]ou = listdemo[1::2]end = []if len(qi) == len(ou): # 奇偶互换for i in range(len(qi)):end.append(ou[i])end.append(qi[i])return "".join(end)
中心号码 “8613800280500” -># “0891683108200805F0”
def deal_center_iphone_num(demo):# 中心号码 "8613800280500" -># "0891683108200805F0"if len(demo) % 2 != 0:demo = demo + "F"else:demo = demolistdemo = []for x in range(len(demo)): listdemo.append(demo[x])qi = listdemo[::2]ou = listdemo[1::2]end = []if len(qi) == len(ou): # 奇偶互换for i in range(len(qi)):end.append(ou[i])end.append(qi[i])return "0891"+"".join(end)
编码短信内容:
def expand_to_16(ucode): #ascii字符--16-bit编码for i in range(len(ucode)):if (len(ucode[i])==4):ucode[i] = ucode[i][0:2]+"00" + ucode[i][2:]return ucodedef msg_ucs2_encode(src):# 7-bit编码 用于发送普通的ASCII字符,# 8-bit编码 通常用于发送数据消息,# UCS2编码 用于发送Unicode字符。# UCS2编码 中文:你好-> 4f60597d# 7-bit hello -> 00680065006c006c006f 英文-> ascii -> 转hex,16进制。补0# 8-bit 1234 - > 0031003200330034decoder = decoder("utf-8")ucs2 = de())src_len = len(ucs2[0])ucode = []result = ""for i in range(src_len):ucode.append(hex(ord(ucs2[0][i])))ucode = expand_to_16(ucode)for item in ucode:result = result+item[2:]return result
‘’’ 产生PDU 编码’’’
def creatr_Pdu(des, smsc, content):# 收件人 中心号码 内容result = ""type_of_address = "91" # 国际91,中国小灵通“91”tp_mti = "01" #tp_mr = "00" #tp_pid = "00" # 默认为普通GSM类型,即点到点方式des_len = "0d" # 目标地址数字个数 共13个十进制数(不包括91和‘F’)alphabet_size = "08" # 默认用16-bit(7/8/16)编码result += deal_center_iphone_num(smsc) # 加中心站号码result += tp_mtiresult += tp_mrresult += des_lenresult += type_of_addressdes = deal_iphonenum(des) # 加收件人号码result += desresult += tp_pidresult += alphabet_size# logger.info("con_len:"+(hex(int(len(content) / 2))[2:]).zfill(2))result += (hex(int(len(content) / 2))[2:]).zfill(2) # !转16进制去除0x 以后补0 0xE->E->0Eresult += contentreturn result.upper()
测试一下:
rev_iphone = "8615928999999"
center_iphone = "8613800280500" # 成都中心号码
masg_content = "坝上雨量缺数,超过2分钟,水位:422.0m/s,最新来数时间:2021-04-15 08:00:00"
masg_content = "1234"
logger.info(creatr_Pdu(rev_iphone, center_iphone, msg_ucs2_encode(masg_content)))
**
**
在这里插入代码片
def OpenPort(portx, bps):try:ser = serial.Serial(portx, bps)if (True == ser.is_open):logger.warning("串口" + str(portx) + "打开成功!")return serexcept Exception as (e)def ColsePort(ser):ser.close()logger.warning((ser.name) + "已经关闭!")# 写数据
def DWritePort(ser, data):result = ser.write(data) # 写数据logger.info(ser)logger.info("Led Write %s(%d)" % (data.hex(), result))return result# 读数据
# 十六进制显示
def hexShow(argv):try:result = ''hLen = len(argv)for i in range(hLen):hvol = argv[i]hhex = '%02x' % hvolresult += hhex+' 'logger.info('Led Read:%s', result)return resultexcept Exception as e:print("---异常---:", e)def DReadPort(ser, way):# 循环接收数据,此为死循环,用线程实现readstr = ""while True:try:# 一个字节一个字节的接收if ser.in_waiting:if (way == 0):for i in range(ser.in_waiting):print("接收ascii数据:" + str(ser.Read_Size(1)))data1 = ser.Read_Size(1).hex() # 转为十六进制data2 = int(data1, 16) # 转为十进制if (data2 == "exit"): # 退出标志breakelse:print("收到数据十六进制:" + data1 + " 收到数据十进制:" + str(data2))if (way == 1):# 整体接收data = ad_all() # 方式二if (data == "exit"): # 退出标志breakelse:print("接收ascii数据:", data)except Exception as e:print("异常报错:", e)def deal_iphonenum(demo): # "8615928789939"if len(demo) % 2 != 0:demo = demo + "F"else:demo = demolistdemo = []for x in range(len(demo)): listdemo.append(demo[x])qi = listdemo[::2]ou = listdemo[1::2]end = []if len(qi) == len(ou): # 奇偶互换for i in range(len(qi)):end.append(ou[i])end.append(qi[i])return "".join(end)# 解码收件号码
def edncode_iphoneNum(msg):try:iphone = msg[26:38]# logger.info(iphone)listdemo = []for x in range(len(iphone)): listdemo.append(iphone[x])qi = listdemo[::2]ou = listdemo[1::2]end = []for i in range(len(qi)):end.append(ou[i])end.append(qi[i])return "".join(end).replace("F", "").replace("f", "")("获取收件号码失败!")def deal_addr_iphonenum(demo): # "8613800280500"if len(demo) % 2 != 0:demo = demo + "F"else:demo = demolistdemo = []for x in range(len(demo)): listdemo.append(demo[x])qi = listdemo[::2] # [1 3]ou = listdemo[1::2] # [2, 4]end = []if len(qi) == len(ou): # 奇偶互换for i in range(len(qi)):end.append(ou[i])end.append(qi[i])("号码有误") # 91 国际 81 小灵通return str(hex(int(len("91" + "".join(end)) / 2)).replace("0x", "").zfill(2)) + "91" + "".join(end)# 中文转unicode # 工作愉快 "u5de5u4f5cu6109u5febuff01"
def deal_msg(demo):return str(json.dumps(demo)).replace("\u", "").upper()def read_lines_from_serialPort(ser):array = []while 1:# ret = adline()# ret = ad(ser.in_waiting)ret = adline()ret = ret.decode().strip()if len(ret) == 0 or ret is None:time.sleep(0.5)continueelse:array.append(ret)logger.info(ret)dswith("OK"):dswith(">"):breakif ret.startswith("+CREG"):breakreturn array# 获取AT指令返回信息
def get_return_data_Msg(ser,str):# ser = serial.Serial('COM3', 115200)logger.info(ser.is_open)ser.flushInput()# at+cmgf?r:0 pdu 1 txtlogger.info("AT指令:" + str)ser.de())while True:count = ser.inWaiting() # 获取串口缓冲区数据if count != 0:recv = ad(ser.in_waiting).decode().strip().replace("r", "").replace("n", "")return recvtime.sleep(0.2)# 获取返回信息
def get_data_Msg(ser1):time_now = w()while True:count = ser1.inWaiting() # 获取串口缓冲区数据if (w() - time_now).total_seconds() > 3:return "TIME_OUT"breakif count != 0:recv = ad(ser1.in_waiting).decode().strip().replace("r", "").replace("n", "")return recvbreaktime.sleep(0.2)# CMS是 短信中心SMSC的返回错误
# at+cmgf?r:0 pdu 1 txt
# AT+CMGD
# AT+CMGL =4 列出所有信息
@logger.catch
def deleteMsg(serList):ser = serial.Serial(serList['portName'], serList['baudRate'])logger.info("串口开启状态:" + str(ser.is_open))# logger.info(ser.is_open)ser.flushInput()steDemo = "AT+CMGD=1,4r"ser.de())while True:count = ser.inWaiting() # 获取串口缓冲区数据if count != 0:recv = ad(ser.in_waiting).decode().strip().replace("r", "").replace("n", "")if recv == "OK":logger.warning("短信删除成功!")ser.close()("短信删除失败!")ser.close()breaktime.sleep(0.2)# flag = ser.is_open# global isBusy# if flag == False:# ('delete_串口未打开!')# return 0# if isBusy:# ('delete_串口繁忙!')# return 0## try:# cmd_cmgd = "AT+CMGD=1,4r"# ser.write(de())# ser.flushInput()# arr2 = read_lines_from_serialPort(ser)# for s2 in arr2:# logger.warning(s2)## isBusy = True# logger.warning("++ delete ++")# while 1:# delMode = "0"# cmd = "at+cmgf?r"# ser.de())# arr = read_lines_from_serialPort(ser)# for s1 in arr:# if s1.startswith("+CMGF:"):# fmt = s1.split(":")[1].strip()# if fmt == "1":# delMode = "ALL"# ########################################################## logger.warning("delMode = " + delMode)# listTemp = []# cmd_cmgl = "at+cmgl=4r"# if delMode == "ALL":# cmd_cmgl = 'at+cmgl="ALL"r'## logger.warning(cmd_cmgl)# ser.write(de())# # ser.flush()# arr2 = read_lines_from_serialPort(ser)# for s2 in arr2:# if s2.startswith("+CMGL:"):# sms_index = s2.split(":")[1].strip().split(",")[0]# listTemp.append(sms_index)# if len(listTemp) > 0:# for idx in listTemp:# cmd_cmgd = "AT+CMGD=%sr" % (idx)# ser.write(de())# # ser.flushInput()# logger.info("已经删除短信:" + str(len(listTemp)) + "条")# else:# logger.info("无信息可删除!")# break# isBusy = False# except Exception as ex:# ("delete fail" + ex)# finally:# isBusy = False# pass@logger.catch
def send(serList, pdumsg, textMsg):try:ser = serial.Serial(serList['portName'], serList['baudRate'])if ser.is_open == True:logger.info("串口开启正常")smgdLen = int((int(len(pdumsg)) - 18) / 2)cmd_cmgs = "at+cmgs=" + str(smgdLen)logger.info(cmd_cmgs)ser.write((cmd_cmgs + "r").encode())time.sleep(1)ser.de())time.sleep(1) # 'x1Ar'ser.write('x1Ar'.encode())time.sleep(1)endDemo = get_data_Msg(ser)logger.info(endDemo)if "ERROR" not in endDemo or endDemo is "TIME_OUT":logger.warning("发送成功!")logger.info("收信人:" + edncode_iphoneNum(pdumsg))logger.info("短信内容:" + de('latin-1').decode('gbk'))ser.close()return 1else:return 0ser.close()("串口未打开"+str(ser.is_open))except Exception as ("send_短信发送异常!" + ex)return 0@logger.catch
def getDblink(ip, user, pas, dbname):try:conn = t(str(ip), str(user), str(pas), str(dbname), login_timeout=5)if conn:logger.warning("LINK DB OK !")return ("LINK DB ERROR!")except Exception as ("LINK DB ERROR!")(e)finally:pass@logger.catch
def updateMagstate(conn, pdumsg, time_limit):try:cur = conn.cursor()sqlUpdate = "update SYS_SMS_PDU set is_send=1 where pdu_msg= '%s' and create_time > '%s' " % (pdumsg, time_ute(sqlUpdate)connmit()if (wcount) > 0:logger.info("update table success")("update error")#7-bit编码用于发送普通的ASCII字符,
# 8-bit编码通常用于发送数据消息,
# UCS2编码用于发送Unicode字符。
@logger.catch
def getMsg(serList,conn, maxmum):logger.info("开始 获取 需要 发送的 msg")global douplerecordcur = conn.cursor()time_inter_now = w()offset_one = datetime.timedelta(hours=pduInterval)time_pdu_limit = (time_inter_now - offset_one).strftime('%Y-%m-%d %H:%M:%S')sqlSelect = "select x.warning_msg,x.pdu_msg from SYS_SMS_PDU x where x.is_send=0 and create_time > '%s';" % (time_pdu_ute(sqlSelect)resulte = cur.fetchall()count = douplerecord['count']logger.warning(len(resulte))if len(resulte) > 0:logger.warning("需要发送短信条数:" + str(len(resulte)))if int(len(resulte) + douplerecord['count']) >= ("今日信息已达最大限度,不发送")else:logger.info("今日信息未达最大限度,开始发送")for i in range(len(resulte)):ifSuccess = send(serList,resulte[i][1], resulte[i][0])if ifSuccess == 1:updateMagstate(coon, resulte[i][1], time_pdu_limit) # update 0-1count = count + 1 # 发1 加 1douplerecord['count'] = counttimeTemp = w().strftime('%Y-%m-%d')if timeTemp > douplerecord['time']: # 时间来到明天,修改内存时间、count 置0,,douplerecord['time'] = timeTempmodify_config('dbconfig.json', timeTemp, count)douplerecord['count'] = 1logger.warning(douplerecord['time']+" 已发短信数目: " + str(douplerecord['count']))("短信发送异常——getMsg!")# logger.info(" 发送完成! ")logger.info("finish . waiting next r")else:logger.warning("无短信需要发送")timeTemp = w().strftime('%Y-%m-%d')if timeTemp > douplerecord['time']: # 时间来到明天,修改内存时间、count 置0,,douplerecord['time'] = timeTempmodify_config('dbconfig.json', douplerecord['time'], count)logger.info("finish . waiting next start ........ r")@logger.catch
def modify_config(input_json_file, date, count):with open(input_json_file, "r", encoding='utf-8-sig') as load_f:data = json.load(load_f)data["daynum"]['time'] = datedata["daynum"]['count'] = countwith open(input_json_file, "w") as dump_f:json.dump(data, dump_f)logger.info("修改 config 文件 成功")def is_chinese(s):"""check the string have chinese or not"""rt = Falseif s >= u"u4e00" and s <= u"u9fa6":rt = Truereturn rt
本文发布于:2024-01-31 13:12:58,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170667797928775.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |