ПЛК управления двигателем на Python
У нас есть различные типы устройств, такие как сов, мотор, аналоговые, цифровые, регулирующие клапаны и т. д. Каждый тип устройства имеет 100 наименований.
Теперь наше программное обеспечение постоянно отслеживает с помощью ПЛК чтение некоторых свойств каждого типа, в соответствии с которыми нам нужно записать какое-либо свойство.
Например, если команда двигателя имеет высокий уровень, то нам необходимо записать сигнал обратной связи на конце ПЛК с высоким уровнем. На данный момент столкнулся с проблемой, что слишком много времени уходит на обновление.
Теперь я описываю свою программную архитектуру:
Я делаю индивидуальный класс для каждого типа устройств. Например, для двигателя:
класс Fn_Motor1D:
def __init__(self,com,df,idxNo):
self._idxNo =idxNo
self.gen = com
self.df = df
self.setup()
self.initilizedigitalinput()
def setup(self):
try:
for tag,col in self.readalltags():
if col==3:
self.cmd = str(tag)
if col == 4:
self.runingFB = str(tag)
if col == 5:
self.remoteFB = str(tag)
if col == 6:
self.healthyFB = str(tag)
if col == 7:
self.readyFB = str(tag)
if col == 8:
self.mccbonFeedBack = str(tag)
if col == 9:
self.overloadFeedBack = str(tag)
if col == 10:
self.faultFB =str(tag)
if col == 11:
self.delaytime = tag
except Exception as e:
print("exception raise",e.args)
log_exception(e)
def initilizedigitalinput(self):
try:
if len(self.healthyFB) > 3:
self.gen.writegeneral.writenodevalue(self.healthyFB, 1)
else:
pass
if len(self.remoteFB) > 3:
self.gen.writegeneral.writenodevalue(self.remoteFB, 1)
else:
pass
if len(self.readyFB) > 3:
self.gen.writegeneral.writenodevalue(self.readyFB, 1)
else:
pass
if len(self.mccbonFeedBack) > 3:
self.gen.writegeneral.writenodevalue(self.mccbonFeedBack, 1)
else:
pass
if len(self.overloadFeedBack) > 3:
self.gen.writegeneral.writenodevalue(self.overloadFeedBack, 0)
else:
pass
if len(self.faultFB) > 3:
self.gen.writegeneral.writenodevalue(self.faultFB, 0)
else:
pass
except Exception as e :
log_exception(e)
def process(self):
try:
oncommandvalue = self.gen.readgeneral.readtagvalue(self.cmd)
if oncommandvalue:
sleep(self.delaytime)
self.gen.writegeneral.writenodevalue(self.runingFB, 1)
else:
self.gen.writegeneral.writenodevalue(self.runingFB, 0)
except Exception as e:
log_exception(e)
def readalltags(self):
n = 3
row, col = self.df.shape
print(col)
while n < col:
data = self.df.iloc[self._idxNo, n]
yield data,n
n = n + 1
Назовите все моторные устройства в одном классе и сделайте listofallmotor1d устройств.
класс Cal_AllMotor1D:
def __init__(self,df,com):
self.df = df
self.com = com
self.listofmotor1D = []
self.setup()
def setup(self):
try:
n =0
self.listofmotor1D.clear()
while n< len(self.df.index):
self.df.iloc[n, 0] = Fn_Motor1D(self.com, self.df, n)
self.listofmotor1D.append(self.df.iloc[n,0])
n = n + 1
except Exception as e :
print(e.args)
log_exception(e)
def process(self):
n = 0
while n < len(self.listofmotor1Dir):
self.listofmotor1Dir[n].process()
n = n + 1
@property
def listofmotor1Dir(self):
if len(self.listofmotor1D) > 0:
return self.listofmotor1D
Теперь я делаю класс AllDevices, в котором я вызываю все устройства и пишу процесс мониторинга:
импорт callallmotor1D
импортировать callallmotor2D_V1
импорт callallsov1S_V1
импорт callallsov2S_V1
импортировать callalanalog_V1
импортировать callallcontrolvalves_V1
импорт callallvibrofeeder_V1
импортировать callallconveyor_V1
импортировать панд как pd
класс Всеустройства:
def __init__(self,comobject):
self._comobject = comobject
dfM1D = pd.read_excel(r'D:\OPCUA\Working_VF1.xls', sheet_name="Motor1D")
dfM2D = pd.read_excel(r'D:\OPCUA\Working_VF1.xls', sheet_name="Motor2D")
dfsov1S = pd.read_excel(r'D:\OPCUA\Working_VF1.xls', sheet_name="Valve1S")
dfsov2S = pd.read_excel(r'D:\OPCUA\Working_VF1.xls', sheet_name="Valve2S")
dfanalog = pd.read_excel(r'D:\OPCUA\Working_VF1.xls', sheet_name="AnalogTx")
dfcontrolvalve = pd.read_excel(r'D:\OPCUA\Working_VF1.xls', sheet_name="ControlValves")
dfvibrofeeder = pd.read_excel(r'D:\OPCUA\Working_VF1.xls', sheet_name="VibroFeeder")
dfconveyor = pd.read_excel(r'D:\OPCUA\Working_VF1.xls', sheet_name="Conveyor")
self.allmotor1dobjects = callallmotor1D.Cal_AllMotor1D(dfM1D, comobject)
self.allmotor2dobjects = callallmotor2D_V1.Cal_AllMotor2D(dfM2D,comobject)
self.allsov1sobjects = callallsov1S_V1.Cal_AllSov1S(dfsov1S,comobject)
self.allsov2sobjects = callallsov2S_V1.Cal_AllSov2S(dfsov2S, comobject)
self.allanalogobjects = calallanalog_V1.Cal_AllAnalogInputs(dfanalog,comobject)
self.allcontrolvalveobjects = calallcontrolvalves_V1.Cal_AllControlValve(dfcontrolvalve,comobject)
self.allvibrofeederobjects = callallvibrofeeder_V1.Cal_AllVibroFeeder(dfvibrofeeder,comobject)
self.allconveyorobjects = callallconveyor_V1.Cal_AllConveyor1D(dfconveyor,comobject)
def allmotor1dprocessing(self):
self.allmotor1dobjects.process()
self.allmotor1dobjects.process()
def allmotor2dprocessing(self):
self.allmotor2dobjects.process()
def allsov1sprocessing(self):
self.allsov1sobjects.process()
def allanalogsprocessing(self):
self.allanalogobjects.process()
def allcontrolvaleprocessing(self):
self.allcontrolvalveobjects.process()
def allvibrofeederprocessing(self):
self.allvibrofeederobjects.process()
def allconveyorprocessing(self):
self.allconveyorobjects.process()
В конце я классифицирую все устройства в своем основном пользовательском интерфейсе и запускаю отдельный поток для каждого типа устройства: —
def allmotor1dprocess():
while True :
self.callalldevices.allmotor1dprocessing()
def allmotor2dprocess():
while True:
self.callalldevices.allmotor2dprocessing()
def allsov1sprocess():
while True:
self.callalldevices.allsov1sprocessing()
def allananlogprocess():
while True:
self.callalldevices.allanalogsprocessing()
def allcontrolvalveprocess():
while True:
self.callalldevices.allcontrolvaleprocessing()
def allconveyorprocess():
while True:
self.callalldevices.allconveyorprocessing()
self.writeallmotor1dthread = threading.Thread(target=allmotor1dprocess)
self.writeallmotor2dthread =threading.Thread(target=allmotor2dprocess)
self.writeallsov1sthread1 = threading.Thread(target=allsov1sprocess)
self.writeallsov1sthread2 = threading.Thread(target=allsov1sprocess2)
self.writeallsov2sthread = threading.Thread(target=allsov2sprocess)
self.writeallanalogthread = threading.Thread(target=allananlogprocess)
self.writeallcontrolvalvethread = threading.Thread(target=allcontrolvalveprocess)
self.writeallconveyorthread = threading.Thread(target=allconveyorprocess)
self.writeallmotor1dthread.start()
self.writeallmotor2dthread.start()
self.writeallsov1sthread1.start()
self.writeallsov1sthread2.start()
self.writeallsov2sthread.start()
self.writeallanalogthread.start()
self.writeallcontrolvalvethread.start()
self.writeallconveyorthread.start()
В этой архитектуре программного обеспечения я действительно сталкиваюсь с проблемой обновления данных ПЛК.
В качестве примера предположим, что я даю команду на мотор. Это займет время, чтобы обновить отзыв о запуске. Итак, мне нужен ваш экспертный взгляд на это. Пожалуйста, дайте мне знать, как я могу улучшить свой код, чтобы ускорить процесс.