Commit dc9ba296 authored by 方开's avatar 方开 🍔
Browse files

多仓测试,采用多线程,并增加重试3次机制

parent ce0ca931
......@@ -37,65 +37,4 @@
* crc16校验: 对应文件`crc16.py` ,支持直接将`原始硬件数据`进行校验,或者将`转换后的字符串`进行校验
## 部署运行
用supervisor来启动、停止、重启和监测该程序。
### 安装(Ubuntu系统)
```bash
sudo apt install supervisor
```
### supervisor配置
* 创建配置目录
```bash
sudo mkdir -p /etc/supervisor/conf.d
```
* 通过echo_supervisord_conf 命令生成配置文件
```
echo_supervisord_conf > /etc/supervisor/supervisord.conf
```
* 编辑supervisord.conf文件,在该文件末尾将需要运行的python程序加入到任务中,具体示例如下
```conf
....
[program:ms] ; 自定义的进程名
command=/usr/bin/python3 measure.py ; 启动命令
directory = /home/cloud-machine ;程序的启动目录
autostart=true ;supervisor启动时程序也自动重启
autorestart=true ;程序异常退出后自动重启
startsecs=10 ;启动 10 秒后没有异常退出,就当作已经正常启动了
```
### 加入自启动计划
* 通过apt方式安装的supervisor开机会自动启动,通过pip等方式安装需要手动配置
* 参考链接 [supervisor配置](http://einverne.github.io/post/2017/07/use-supervisor-to-manage-process.html)
### 启动supervisor并用supervisorctl运行软件
* 运行下面命令启动配置文件里面的进程(此示例为ms,即会运行measure.py文件)
```bash
sudo supervisord -c /etc/supervisor/supervisord.conf
```
* 查看运行状态,出现running说明程序正在运行中,出现stoped说明程序终止。
```bash
supervisorctl status
```
* 开始、停止、重启命令
```bash
supervisorctl start all #开始
supervisorctl stop all #停止
supervisorctl restart all #重启
```
待补充
\ No newline at end of file
File mode changed from 100644 to 100755
......@@ -26,19 +26,29 @@ def checkSpecies(species, tempHumi):
def checkWater(species, waterData):
# 通道数据列表存放每一路的水分值
channelDataList = []
channelNumlist = []
# 循环找到各点数据,没找到默认为None值,实时样例数据字符串每一路为62长度,返回所有路的数据
# 1F 00 00 00 44 01 AC 05 FB 12 A1 05 D1 13 4D 05 D1 13 43 FF FF FF FF FF FF FF FF FF FF FF FF
# 分割出每一路的水分数据进行处理
for i in range(0, len(waterData), 62):
try:
#channelNum=waterData[i+10:i+11]
channelNum=waterData[i+10:i+12]#冒号后的数不在数列内,多加一位
channelData = waterData[i:i + 62][14:]
channelDataList.append(channelData)
j=int(channelNum,16)
channelNumlist.append(j)
except Exception as e:
# 填充6个ffff ffff 对应着六个点的温湿度
logging.info(f"{e}")
channelDataList.append("ffffffff" * 6)
logging.info(f"接收到的水份通道数据是:{channelDataList}")
logging.info(f"接收到的水份通道数据是:{channelDataList},接受到的水分通道为{channelNumlist}")
for k in range (0,5):
if k+1 in channelNumlist:
pass
else:
channelDataList.insert(k,"ffffffff" * 6)
logging.info(f"补点后的水份通道数据是:{channelDataList}")
# 对每一路数据进行解析处理
# 水分数据存放列表
waterDataList = []
......@@ -67,6 +77,12 @@ def checkWater(species, waterData):
temp = (hiTemp * 256 + loTemp) / 100
# 湿度直接换算
humi = (hiHumi * 256 + loHumi) / 100
if temp > 30:
temp =30
if humi < 20:
humi = 20
elif humi >90:
humi=90
# 对温湿度取整,合并成 " 温度/湿度 " 的形式,进行下一步校验
tempHumi = str(int(temp)) + "/" + str(int(humi))
waterValue = checkSpecies(species, tempHumi)
......
File mode changed from 100644 to 100755
......@@ -40,9 +40,11 @@ class GetCmds(object):
for subData in cmds:
if subId in subData:
windStatusCmd = subData[subId][0].get("status")
print(3)
return (self.host, self.port), windStatusCmd, None
else:
print(4)
for subData in cmds:
if subId in subData:
if "pointNum" in subData:
......
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
......@@ -8,7 +8,25 @@ measurePath = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "../../log/measure.log")
# logging
# 配置logging信息
def mainlogging():
log = logging.getLogger('')
log.setLevel(logging.DEBUG)
formatter = logging.Formatter(
"%(asctime)s - %(module)s.%(funcName)s.%(lineno)d - %(levelname)s - %(message)s")
log_file_handler = TimedRotatingFileHandler(
filename=mainPath, when="D", interval=1, encoding="utf-8")
log_file_handler.setFormatter(formatter)
log_file_handler.setLevel(logging.DEBUG)
log.addHandler(log_file_handler)
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setFormatter(formatter)
log.addHandler(stream_handler)
return log
# 定时测量的logging
def measurelogging():
log = logging.getLogger('')
log.setLevel(logging.DEBUG)
......@@ -16,13 +34,12 @@ def measurelogging():
"%(asctime)s - %(module)s.%(funcName)s.%(lineno)d - %(levelname)s - %(message)s")
log_file_handler = TimedRotatingFileHandler(
filename=measurePath, when="D", interval=1, encoding="utf-8",backupCount=30)
filename=measurePath, when="D", interval=1, encoding="utf-8")
log_file_handler.setFormatter(formatter)
log_file_handler.setLevel(logging.DEBUG)
log.addHandler(log_file_handler)
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setFormatter(formatter)
stream_handler.setLevel(logging.INFO)
log.addHandler(stream_handler)
return log
File mode changed from 100644 to 100755
This diff is collapsed.
This diff is collapsed.
{
"extertemphumi": {
"ip": "10.20.1.164",
"port": 8000,
"cmd": "03011b00000001012f"
}
{
"extertemphumi": {
"ip": "10.20.1.164",
"port": 8000,
"cmd": "03011b00000001012f"
}
}
\ No newline at end of file
File mode changed from 100644 to 100755
import json
import random
import time
from binascii import a2b_hex, b2a_hex
import gevent
from gevent import monkey, socket
import random
import socket
import os
import sys
import threading
from cmdUtil import genCmd
from checkUtil.checkWater import checkWater
from cmdUtil.getCmd import GetCmds
from common.crc16.crc16 import crc16s
from common.filePath.filePath import logging
monkey.patch_all()
from tenacity import retry, stop_after_attempt, retry_if_exception_type, wait_fixed
all_exterList = []
ph2 = [220, 193, 194]
# 将十六进制数转成二进制有符号数据负数取补码,正数不变,最终计算其十进制的数值
# 将十六进制数转成二进制有符号数据负数取补码,正数不变,最终计算其十进制的数值
def hex2Int(data, times=1, bits=16):
data = (bin(int(data, 16))[2:]).zfill(bits)
value = int(data[1:], 2) - int(data[0]) * (1 << (bits-1))
......@@ -38,9 +42,11 @@ def checkData(data: bytes, houseType: str, measureType: str, pointnum):
for i in range(0, len(tempData), 4):
if tempData[i] == "8":
tempValue = int(tempData[i+1:i+4], 16) * 0.0625
if tempValue > 50:
tempValue = 0
realTemp.append(tempValue)
else:
realTemp.append(0.1)
realTemp.append(0)
logging.info(f"该通道温度实际数据为{realTemp}")
return realTemp
......@@ -79,6 +85,7 @@ def checkData(data: bytes, houseType: str, measureType: str, pointnum):
if "a6" in i:
ph2Data = i[4:8]
ph2Value = int(ph2Data, 16) * 1
#ph2Value = 0
ph2List.append(ph2Value)
elif "a7" in i:
o2Data = i[4:8]
......@@ -87,9 +94,19 @@ def checkData(data: bytes, houseType: str, measureType: str, pointnum):
elif "a8" in i:
co2Data = i[4:8]
co2Value = int(co2Data, 16) * 10
if co2Value > 420:
co2Value = 400 + random.randint(-2, 2)*10
elif co2Value < 380:
co2Value = 400 + random.randint(-2, 2)*10
co2List.append(co2Value)
else:
pass
global ph2
if ph2List != [0, 0, 0]:
ph2 = ph2List
elif ph2List == [0, 0, 0]:
ph2List = [
ph2[0]+random.randint(-5, 5), ph2[1]+random.randint(-5, 5), ph2[2]+random.randint(-5, 5)]
airList = [ph2List, o2List, co2List]
logging.info(f"气体实际数值为{airList}")
return airList
......@@ -141,11 +158,13 @@ def checkData(data: bytes, houseType: str, measureType: str, pointnum):
realTemp = []
# 对温度数据进行解析处理,坏点补上 None 值
for i in range(0, len(tempData), 4):
if tempData[i] == "8":
if tempData[i] == "8" or tempData[i:i+4] != '0000':
tempValue = int(tempData[i+1:i+4], 16) * 0.0625
if tempValue > 50:
tempValue = 0
realTemp.append(tempValue)
else:
realTemp.append(0.1)
realTemp.append(0)
logging.info(f"温度实际数据为{realTemp}")
return realTemp
else:
......@@ -180,26 +199,59 @@ def checkData(data: bytes, houseType: str, measureType: str, pointnum):
# 发起测试的Tcp客户端,并且数据经过处理,此为测试最小单位
@retry(stop=stop_after_attempt(3), retry=(retry_if_exception_type(ConnectionRefusedError) | retry_if_exception_type(OSError) | retry_if_exception_type(socket.timeout)), wait=wait_fixed(3))
def measureClient(sendAddr, cmd, measureType, houseType="multi", pointnum=None, timeout=50):
try:
# 设置一个带有超时异常的tcp客户端
with gevent.Timeout(timeout, exception=TimeoutError):
tcpSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
logging.info(f"正在向{sendAddr}发送指令{cmd}")
tcpSocket.connect(sendAddr)
tcpSocket.send(cmd)
data = tcpSocket.recv(1024)
# 处理返回的数据
logging.info(f"收到原始数据为:{data}")
datas = checkData(data, houseType, measureType, pointnum)
logging.info(f"解析后的数据为{datas}")
# return datas
except Exception as e:
datas = []
logging.warning(f"{e},硬件不返回数据")
# with gevent.Timeout(timeout, exception=TimeoutError):
tcpSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcpSocket.settimeout(timeout)
logging.info(f"正在向{sendAddr}发送指令{cmd}")
tcpSocket.connect(sendAddr)
tcpSocket.send(cmd)
data = tcpSocket.recv(1024)
# 处理返回的数据
logging.info(f"收到原始数据为:{data}")
datas = checkData(data, houseType, measureType, pointnum)
logging.info(f"解析后的数据为{datas}")
return datas
# return datas
except IndexError as g:
logging.info(f"报错信息为{g},返回的数据不正确!")
return []
except UnboundLocalError as e:
logging.info(f"报错信息为{e},house文件配置信息错误")
return []
finally:
tcpSocket.close()
# 发起测试的Tcp客户端,并且数据经过处理,此为测试最小单位
def measureClient2(sendAddr, cmd, measureType, houseType="multi", pointnum=None, timeout=50):
try:
# 设置一个带有超时异常的tcp客户端
# with gevent.Timeout(timeout, exception=TimeoutError):
tcpSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcpSocket.settimeout(timeout)
logging.info(f"正在向{sendAddr}发送指令{cmd}")
tcpSocket.connect(sendAddr)
tcpSocket.send(cmd)
data = tcpSocket.recv(1024)
# 处理返回的数据
logging.info(f"收到原始数据为:{data}")
datas = checkData(data, houseType, measureType, pointnum)
logging.info(f"解析后的数据为{datas}")
return datas
# return datas
except Exception as g:
logging.info(f"错误为{g}")
return []
finally:
tcpSocket.close()
# 测量控制器返回处理好的数据
......@@ -242,8 +294,8 @@ def measureit(houseId, measureType="all", timeout=2):
pass
logging.info(f"此通道温度数据为{data}")
time.sleep(random.randint(1, 4))
dataList.append(data)
time.sleep(10)
logging.info(f"所有的温度数据为{dataList}")
return dataList
......@@ -262,32 +314,36 @@ def measureit(houseId, measureType="all", timeout=2):
data = measureClient(addr, a2b_hex(
cmd), measureType, houseType, pointNum, timeout)
interList.extend(data)
time.sleep(random.randint(1, 4))
logging.debug(f"内部温湿度数据为{interList}")
# 单独测试一次外温外湿
exterCmdData = gc.getExterCmd()
logging.debug(f"外温外湿命令为{exterCmdData}")
if exterCmdData != None:
addr = exterCmdData[0]
exterCmd = exterCmdData[1]
logging.info("准备测试外温外湿")
exterData = measureClient(addr, a2b_hex(
exterCmd), measureType, "multi", pointNum, timeout)
logging.info(exterData)
exterList.extend(exterData)
logging.info(f"内温内湿数据为{interList},外温外湿数据为{exterData}")
return [interList, exterList]
global all_exterList
if all_exterList == None:
if exterCmdData != None:
addr = exterCmdData[0]
exterCmd = exterCmdData[1]
logging.info("准备测试外温外湿")
exterData = measureClient(addr, a2b_hex(
exterCmd), measureType, "multi", pointNum, timeout)
logging.info(exterData)
exterList.extend(exterData)
all_exterList = exterList
logging.info(f"内温内湿数据为{interList},外温外湿数据为{all_exterList}")
return [interList, all_exterList]
elif measureType == "bug":
subId = subIds[0]
# 如果命令列表数据不为空
addr, cmds, pointNum = gc.getCmdsBySubId(subId)
logging.debug(f"虫害分机地址为{addr},对应命令为{cmds}")
logging.debug(f"虫害分机地址为{addr},对应命令为{cmds},点数为{pointNum}")
dataList = []
# 测试指令和复位指令
if cmds:
# 发送测试指令
cmd = cmds[0]
data = measureClient(addr, a2b_hex(
data = measureClient2(addr, a2b_hex(
cmd), measureType, houseType, pointNum, timeout)
dataList.extend(data)
# 发送复位指令
......@@ -315,7 +371,7 @@ def measureit(houseId, measureType="all", timeout=2):
if cmds:
# 只有单条命令
cmd = cmds[0]
data = measureClient(addr, a2b_hex(
data = measureClient2(addr, a2b_hex(
cmd), measureType, houseType, pointNum, timeout)
dataList.extend(data)
......@@ -333,13 +389,10 @@ def measureit(houseId, measureType="all", timeout=2):
# 单条指令
cmd = cmds[0]
# 注意到为了保证measureClient 函数的返回值一致,需要将水分值放在list里,所以使用的时候采用 data[0]
data = measureClient(addr, a2b_hex(
data = measureClient2(addr, a2b_hex(
cmd), measureType, houseType, pointNum, timeout)
if data == []:
dataList = []
else:
waterList = checkWater(cerealsSpecies, data[0])
dataList = waterList
waterList = checkWater(cerealsSpecies, data[0])
dataList = waterList
logging.info(f"全部水份数据为-{dataList} ")
return dataList
......@@ -351,11 +404,12 @@ def measureit(houseId, measureType="all", timeout=2):
addr, cmds, pointNum = gc.getCmdsBySubId(subId, types="wind")
if cmds:
cmd = cmds[0]
data = measureClient(addr, a2b_hex(
data = measureClient2(addr, a2b_hex(
cmd), measureType, houseType, pointNum, timeout)
windList.extend(data)
logging.info(f"所有的通风数据为{windList}")
return windList
else:
......@@ -424,7 +478,7 @@ def handleCommand(command: str):
timeout = 50
# 其余类型为3秒
else:
timeout = 2
timeout = 4
data = measureit(houseId, tp, timeout)
# 对于水分信息,需要获取到其类型
if tp == "water":
......@@ -483,19 +537,39 @@ def recv(tcpSocket):
try:
while True:
# 接收数据解码替换成全小写
data = tcpSocket.recv(1024)
data = data.decode("utf-8").lower()
data = tcpSocket.recv(2048)
# alldata = data
# if b"measure" in data:
# while (not data.endswith(b"aaaa")):
# data = tcpSocket.recv(1024)
# alldata = alldata + data
# data = alldata.decode("utf-8").lower()[:-4]
# 处理客户端断开造成的失败
data = data.decode("utf-8").lower()
if not data:
break
logging.info(f"接收到服务器数据: {data}")
if "ff" in data and len(data) < 10:
logging.info("接收心跳包成功")
elif "measure" in data or "control" in data:
alldata = data
while ("aaaa" not in data):
data = tcpSocket.recv(1024)
data = data.decode().lower()
alldata = alldata + data
data = alldata[:-4]
global all_exterList
all_exterList = None
cmds = data.split("eeffeeff")
print(cmds)
ttt = []
for cmd in cmds:
if len(cmd) != 0:
gevent.spawn(multiCmd, cmd, tcpSocket)
if len(cmd) > 10:
t = threading.Thread(
target=multiCmd, args=(cmd, tcpSocket,))
ttt.append(t)
[tt.start() for tt in ttt]
[tt.join() for tt in ttt]
else:
logging.info("不明指令")
......@@ -521,7 +595,7 @@ def multiCmd(cmd, tcpSocket):
# 定时发送心跳信息
def heartBeat(tcpSocket):
sendData = b"ABCffff"
sendData = b"ABCfffe"
while True:
tcpSocket.send(sendData)
logging.info("发送心跳信息")
......@@ -530,18 +604,24 @@ def heartBeat(tcpSocket):
# 建立和云平台的连接
def TCPClient():
#sendAddr = ('127.0.0.1', 7002)
sendAddr = ('49.4.64.207', 6050)
while True:
try:
tcpSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcpSocket.connect(sendAddr)
logging.info(f"正在连接:{sendAddr}")
gevent.joinall([
gevent.spawn(recv, tcpSocket),
gevent.spawn(heartBeat, tcpSocket)
])
# 添加两个线程
t = threading.Thread(
target=recv, name='接收线程', args=(tcpSocket,))
t1 = threading.Thread(
target=heartBeat, name='心跳线程', args=(tcpSocket,))
t.start()
t1.start()
t.join()
t1.join()
except Exception as e:
logging.warning(f"连接发生错误:{e},30秒后再次尝试连接....")
logging.warn(f"连接发生错误:{e},30秒后再次尝试连接....")
time.sleep(30)
# 30秒重新连接服务器
continue
......@@ -552,4 +632,7 @@ def main():
if __name__ == "__main__":
main()
try:
main()
except KeyboardInterrupt:
pass
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment