aircampro

multiprocessing in python

Feb 15th, 2022 (edited)
133
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 19.56 KB | None | 0 0
  1. #
  2. # a test stub to test multi-processing python
  3. #
  4. # it shows the functionality of multiprocess library
  5. # for daemon, sequential, and paralel pool
  6. #
  7. import multiprocessing
  8.  
  9. # wait
  10. import time,timeit
  11. import sys
  12.  
  13. # Starting timer for Parent measurement
  14. start_time = timeit.default_timer()
  15. timer = 0
  16.  
  17. class mavlinkSonyCamWriteVals():
  18.  
  19. STATE_INIT = 99
  20. STATE_READY = 1
  21. STATE_CAM_WRITING = 2
  22. STATE_MAV_READING = 3
  23. STATE_MAV_WRITING = 4
  24. STATE_CAM_READING = 5
  25. STATE_WAIT = 6
  26.  
  27. # indiviual states when a sequential priority queue is required
  28. FUNC_EX_PRO = 7
  29. FUNC_APER = 8
  30. FUNC_FOCUS = 9
  31. FUNC_ISO = 10
  32. FUNC_SS = 11
  33. FUNC_WB = 12
  34. FUNC_SC = 13
  35.  
  36. class_global = 99
  37. numberOfVals = 0
  38.  
  39. def __init__ (self):
  40. self.set_sony_iso = multiprocessing.Value('i', mavlinkSonyCamWriteVals.STATE_INIT)
  41. self.set_sony_aperture = multiprocessing.Value('i', mavlinkSonyCamWriteVals.STATE_INIT)
  42. mavlinkSonyCamWriteVals.numberOfVals += 1
  43.  
  44. def __del__(self):
  45. class_name = self.__class__.__name__
  46. print('{} Deleted'.format(class_name))
  47.  
  48. def add_aper(self):
  49. self.set_sony_aperture.value += 1
  50.  
  51. # -------------------------------- SEQUENTIAL ------------------------------------------------------------------------------
  52. #
  53. # this task runs sequentially and doesnt set the variables (attributes) of the class passed to it.
  54. #
  55. def manageAlphaCameraExpro( classObj, pvar, mpc, state_of_task ):
  56.  
  57. state_of_task.value = mavlinkSonyCamWriteVals.STATE_CAM_WRITING
  58. print(f"\033[96m Task1:: initial value set is {classObj.set_sony_iso.value} {classObj.set_sony_aperture.value} {mavlinkSonyCamWriteVals.class_global} {pvar} {mpc} \033[0m")
  59. time.sleep(1)
  60. state_of_task.value = mavlinkSonyCamWriteVals.STATE_WAIT
  61. with classObj.set_sony_iso.get_lock():
  62. classObj.set_sony_iso.value += 1
  63. classObj.add_aper()
  64. mavlinkSonyCamWriteVals.class_global += 1
  65. pvar += 1
  66. with mpc.get_lock():
  67. mpc.value = mavlinkSonyCamWriteVals.FUNC_APER
  68. print(f"Task1:: second value set is {classObj.set_sony_iso.value} {classObj.set_sony_aperture.value} {mavlinkSonyCamWriteVals.class_global} {pvar} {mpc}")
  69. # advance to the next routine in the queued sequence
  70. with state_of_task.get_lock():
  71. state_of_task.value = mavlinkSonyCamWriteVals.STATE_READY
  72. # true paralel tasking mode --> state_of_task.value = mavlinkSonyCamWriteVals.STATE_READY
  73.  
  74. #
  75. # this task runs sequentially and doesnt set the variables (attributes) of the class passed to it.
  76. #
  77. def manageAlphaCameraAperture( classObj, pvar, mpc, state_of_task ):
  78.  
  79. state_of_task.value = mavlinkSonyCamWriteVals.STATE_CAM_WRITING
  80. print(f"\033[95m Task2:: initial value set is {classObj.set_sony_iso.value} {classObj.set_sony_aperture.value} {mavlinkSonyCamWriteVals.class_global} {pvar} {mpc} \033[0m")
  81. time.sleep(1)
  82. state_of_task.value = mavlinkSonyCamWriteVals.STATE_WAIT
  83. with classObj.set_sony_iso.get_lock():
  84. classObj.set_sony_iso.value += 1
  85. classObj.add_aper()
  86. mavlinkSonyCamWriteVals.class_global += 1
  87. pvar += 1
  88. with mpc.get_lock():
  89. mpc.value = mavlinkSonyCamWriteVals.FUNC_FOCUS
  90. print(f"Task2:: second value set is {classObj.set_sony_iso.value} {classObj.set_sony_aperture.value} {mavlinkSonyCamWriteVals.class_global} {pvar} {mpc}")
  91. # advance to the next routine in the queued sequence
  92. with state_of_task.get_lock():
  93. state_of_task.value = mavlinkSonyCamWriteVals.STATE_READY
  94. # true paralel tasking mode --> state_of_task.value = mavlinkSonyCamWriteVals.STATE_READY
  95.  
  96. def manageAlphaCameraFocusData( classObj, pvar, mpc, state_of_task ):
  97.  
  98. state_of_task.value = mavlinkSonyCamWriteVals.STATE_CAM_WRITING
  99. print(f"\033[94m Task3:: initial value set is {classObj.set_sony_iso.value} {classObj.set_sony_aperture.value} {mavlinkSonyCamWriteVals.class_global} {pvar} {mpc} \033[0m")
  100. time.sleep(1)
  101. state_of_task.value = mavlinkSonyCamWriteVals.STATE_WAIT
  102. with classObj.set_sony_iso.get_lock():
  103. classObj.set_sony_iso.value += 1
  104. classObj.add_aper()
  105. mavlinkSonyCamWriteVals.class_global += 1
  106. pvar += 1
  107. with mpc.get_lock():
  108. mpc.value = mavlinkSonyCamWriteVals.FUNC_ISO
  109. print(f"Task3:: second value set is {classObj.set_sony_iso.value} {classObj.set_sony_aperture.value} {mavlinkSonyCamWriteVals.class_global} {pvar} {mpc}")
  110. # advance to the next routine in the queued sequence
  111. with state_of_task.get_lock():
  112. state_of_task.value = mavlinkSonyCamWriteVals.STATE_READY
  113. # true paralel tasking mode --> state_of_task.value = mavlinkSonyCamWriteVals.STATE_READY
  114.  
  115. def manageAlphaCameraIso( classObj, pvar, mpc, state_of_task ):
  116.  
  117. state_of_task.value = mavlinkSonyCamWriteVals.STATE_CAM_WRITING
  118. print(f"\033[93m Task4:: initial value set is {classObj.set_sony_iso.value} {classObj.set_sony_aperture.value} {mavlinkSonyCamWriteVals.class_global} {pvar} {mpc} \033[0m")
  119. time.sleep(1)
  120. state_of_task.value = mavlinkSonyCamWriteVals.STATE_WAIT
  121. with classObj.set_sony_iso.get_lock():
  122. classObj.set_sony_iso.value += 1
  123. classObj.add_aper()
  124. mavlinkSonyCamWriteVals.class_global += 1
  125. pvar += 1
  126. with mpc.get_lock():
  127. mpc.value = mavlinkSonyCamWriteVals.FUNC_SS
  128. print(f"Task4:: second value set is {classObj.set_sony_iso.value} {classObj.set_sony_aperture.value} {mavlinkSonyCamWriteVals.class_global} {pvar} {mpc}")
  129. # advance to the next routine in the queued sequence
  130. with state_of_task.get_lock():
  131. state_of_task.value = mavlinkSonyCamWriteVals.STATE_READY
  132. # true paralel tasking mode --> state_of_task.value = mavlinkSonyCamWriteVals.STATE_READY
  133.  
  134. def manageAlphaCameraShutSpd( classObj, pvar, mpc, state_of_task ):
  135.  
  136. state_of_task.value = mavlinkSonyCamWriteVals.STATE_CAM_WRITING
  137. print(f"\033[92m Task5:: initial value set is {classObj.set_sony_iso.value} {classObj.set_sony_aperture.value} {mavlinkSonyCamWriteVals.class_global} {pvar} {mpc} \033[0m")
  138. time.sleep(1)
  139. state_of_task.value = mavlinkSonyCamWriteVals.STATE_WAIT
  140. with classObj.set_sony_iso.get_lock():
  141. classObj.set_sony_iso.value += 1
  142. classObj.add_aper()
  143. mavlinkSonyCamWriteVals.class_global += 1
  144. pvar += 1
  145. with mpc.get_lock():
  146. mpc.value = mavlinkSonyCamWriteVals.FUNC_WB
  147. print(f"Task5:: second value set is {classObj.set_sony_iso.value} {classObj.set_sony_aperture.value} {mavlinkSonyCamWriteVals.class_global} {pvar} {mpc}")
  148. # advance to the next routine in the queued sequence
  149. with state_of_task.get_lock():
  150. state_of_task.value = mavlinkSonyCamWriteVals.STATE_READY
  151. # true paralel tasking mode --> state_of_task.value = mavlinkSonyCamWriteVals.STATE_READY
  152.  
  153. def manageAlphaWhiteBala( classObj, pvar, mpc, state_of_task ):
  154.  
  155. state_of_task.value = mavlinkSonyCamWriteVals.STATE_CAM_WRITING
  156. print(f"\033[91;44m Task6:: initial value set is {classObj.set_sony_iso.value} {classObj.set_sony_aperture.value} {mavlinkSonyCamWriteVals.class_global} {pvar} {mpc} \033[0m")
  157. time.sleep(1)
  158. state_of_task.value = mavlinkSonyCamWriteVals.STATE_WAIT
  159. with classObj.set_sony_iso.get_lock():
  160. classObj.set_sony_iso.value += 1
  161. classObj.add_aper()
  162. mavlinkSonyCamWriteVals.class_global += 1
  163. pvar += 1
  164. with mpc.get_lock():
  165. mpc.value = mavlinkSonyCamWriteVals.FUNC_SC
  166. print(f"Task6:: second value set is {classObj.set_sony_iso.value} {classObj.set_sony_aperture.value} {mavlinkSonyCamWriteVals.class_global} {pvar} {mpc}")
  167. # advance to the next routine in the queued sequence
  168. with state_of_task.get_lock():
  169. state_of_task.value = mavlinkSonyCamWriteVals.STATE_READY
  170. # true paralel tasking mode --> state_of_task.value = mavlinkSonyCamWriteVals.STATE_READY
  171.  
  172. def manageAlphaCameraStillCap( classObj, pvar, mpc, state_of_task ):
  173.  
  174. state_of_task.value = mavlinkSonyCamWriteVals.STATE_CAM_WRITING
  175. print(f"\033[97m Task7:: initial value set is {classObj.set_sony_iso.value} {classObj.set_sony_aperture.value} {mavlinkSonyCamWriteVals.class_global} {pvar} {mpc} \033[0m")
  176. time.sleep(1)
  177. state_of_task.value = mavlinkSonyCamWriteVals.STATE_WAIT
  178. with classObj.set_sony_iso.get_lock():
  179. classObj.set_sony_iso.value += 1
  180. classObj.add_aper()
  181. mavlinkSonyCamWriteVals.class_global += 1
  182. pvar += 1
  183. with mpc.get_lock():
  184. mpc.value = mavlinkSonyCamWriteVals.FUNC_EX_PRO
  185. print(f"Task7:: second value set is {classObj.set_sony_iso.value} {classObj.set_sony_aperture.value} {mavlinkSonyCamWriteVals.class_global} {pvar} {mpc}")
  186. # advance to the next routine in the queued sequence
  187. with state_of_task.get_lock():
  188. state_of_task.value = mavlinkSonyCamWriteVals.STATE_READY
  189. # true paralel tasking mode --> state_of_task.value = mavlinkSonyCamWriteVals.STATE_READY
  190.  
  191. # --------------------------- DAEMON --------------------------------------------------------------
  192. #
  193. # This task seems to write the value to the class until it exits
  194. #
  195. def manageTask2( classObj, mpc, count=5 ):
  196. while (count >= 0):
  197. print(f"\033[93;45m Task2 Daemon:: initial value set is {classObj.set_sony_iso.value} {classObj.set_sony_aperture.value} {mavlinkSonyCamWriteVals.class_global} {mpc} \033[0m")
  198. time.sleep(2)
  199. with classObj.set_sony_iso.get_lock():
  200. classObj.set_sony_iso.value += 1
  201. classObj.add_aper()
  202. mavlinkSonyCamWriteVals.class_global += 1
  203. print(f"Task2 Daemon:: second value set is {classObj.set_sony_iso.value} {classObj.set_sony_aperture.value} {mavlinkSonyCamWriteVals.class_global} {mpc}")
  204. count -= 1
  205.  
  206. def manageTask3( classObj, mpc ):
  207. while True:
  208. manageTask2( classObj, mpc )
  209. time.sleep(10)
  210.  
  211. # ----------------------- PARALEL -----------------------------------------------------------------
  212. #
  213. def sendMavExpro( wedge ):
  214. print(f"\033[32;41m process sendMavExpro {wedge} \033[0m")
  215. time.sleep(1)
  216. timer = str(int(timeit.default_timer() - start_time))
  217. print (f"Elapsed Time: {timer} Loop Time: {str(time.time()/1000)} ")
  218.  
  219. def sendMavAper( wedge ):
  220. time.sleep(1)
  221. print(f"\033[33;42m process sendMavAper {wedge} \033[0m")
  222. time.sleep(1)
  223. timer = str(int(timeit.default_timer() - start_time))
  224. print (f"Elapsed Time: {timer} Loop Time: {str(time.time()/1000)} ")
  225.  
  226. def sendMavFocusData( wedge ):
  227. time.sleep(1)
  228. print(f"\033[30;43m process sendMavFocusData {wedge} \033[0m")
  229. time.sleep(1)
  230. timer = str(int(timeit.default_timer() - start_time))
  231. print (f"Elapsed Time: {timer} Loop Time: {str(time.time()/1000)} ")
  232.  
  233. def sendMavIso( wedge ):
  234. time.sleep(1)
  235. print(f"\033[32;44m process sendMavIso {wedge} \033[0m")
  236. time.sleep(1)
  237. timer = str(int(timeit.default_timer() - start_time))
  238. print (f"Elapsed Time: {timer} Loop Time: {str(time.time()/1000)} ")
  239.  
  240. def sendMavShutSpd( wedge ):
  241. time.sleep(1)
  242. print(f"\033[36;45m process sendMavShutSpd {wedge} \033[0m")
  243. time.sleep(4)
  244. timer = str(int(timeit.default_timer() - start_time))
  245. print (f"Elapsed Time: {timer} Loop Time: {str(time.time()/1000)} ")
  246.  
  247. def sendMavWhiteBala( wedge ):
  248. time.sleep(1)
  249. print(f"\033[37;46m process sendMavWhiteBala {wedge} \033[0m")
  250. time.sleep(2)
  251. timer = str(int(timeit.default_timer() - start_time))
  252. print (f"Elapsed Time: {timer} Loop Time: {str(time.time()/1000)} ")
  253.  
  254. def sendMavStillCap( wedge ):
  255. time.sleep(1)
  256. print(f"\033[31;47m process sendMavStillCap {wedge} \033[0m")
  257. time.sleep(1.5)
  258. timer = str(int(timeit.default_timer() - start_time))
  259. print (f"Elapsed Time: {timer} Loop Time: {str(time.time()/1000)} ")
  260.  
  261. if __name__ == '__main__':
  262.  
  263. # now use one of the processing values from the multiprocessing library
  264. #
  265. mp_choice = multiprocessing.Value('i', mavlinkSonyCamWriteVals.FUNC_EX_PRO)
  266.  
  267. mp_state = multiprocessing.Value('i', mavlinkSonyCamWriteVals.STATE_INIT)
  268.  
  269. # use an internal program variable looks like it cant we written to when in multiprocessing mode
  270. #
  271. programVar = mavlinkSonyCamWriteVals.STATE_INIT
  272.  
  273. gcsWrites2Sony = mavlinkSonyCamWriteVals()
  274. p1 = multiprocessing.Process(name='manageAlphaCameraExpro', target=manageAlphaCameraExpro, args=(gcsWrites2Sony,programVar, mp_choice, mp_state, ))
  275. print("p1 is created...")
  276.  
  277. time.sleep(0.5)
  278. #
  279. # ================================= DAEMON CONTINUOS TASK No.1 ======================================
  280. #
  281. # this runs the daemon independantly once
  282. #
  283. # p2 = multiprocessing.Process(name='manageTask2', target=manageTask2, args=(gcsWrites2Sony, mp_choice,))
  284. #
  285. # this one repeats the above method every 30 seconds as a continuous daemon
  286. #
  287. p2 = multiprocessing.Process(name='manageTask3', target=manageTask3, args=(gcsWrites2Sony, mp_choice,))
  288. p2.daemon = True
  289. if not p2.is_alive() == True:
  290. p2.start()
  291. print("daemon is running")
  292.  
  293. # initialise pool data
  294. #
  295. max_number_processes = 7
  296. wedge = 14356
  297.  
  298. #
  299. # continuos task scheduler
  300. #
  301. while True:
  302. print("start at top of loop using multiprocessing.....")
  303.  
  304. #
  305. # ===================== Sequential Task Scheduler ==========================
  306. #
  307. # runs each task in a linear sequence wating for completion in the background
  308. #
  309.  
  310. if p1 is not None:
  311. print("p1 is made")
  312. if (p1.is_alive() == False):
  313. print(f"\033[36m p1 is re-starting {mp_state.value} \033[0m")
  314. try:
  315. if (mp_choice.value == mavlinkSonyCamWriteVals.FUNC_EX_PRO):
  316. print("\033[31m EXPRO \033[0m")
  317. p1 = multiprocessing.Process(name='manageAlphaCameraExpro', target=manageAlphaCameraExpro, args=(gcsWrites2Sony,programVar,mp_choice, mp_state,))
  318. elif (mp_choice.value == mavlinkSonyCamWriteVals.FUNC_APER):
  319. print("\033[31m APER \033[0m")
  320. p1 = multiprocessing.Process(name='manageAlphaCameraAperture', target=manageAlphaCameraAperture, args=(gcsWrites2Sony,programVar,mp_choice, mp_state,))
  321. elif (mp_choice.value == mavlinkSonyCamWriteVals.FUNC_FOCUS):
  322. print("\033[31m FOCUS \033[0m")
  323. p1 = multiprocessing.Process(name='manageAlphaCameraFocusData', target=manageAlphaCameraFocusData, args=(gcsWrites2Sony,programVar,mp_choice, mp_state,))
  324. elif (mp_choice.value == mavlinkSonyCamWriteVals.FUNC_ISO):
  325. print("\033[31m ISO \033[0m")
  326. p1 = multiprocessing.Process(name='manageAlphaCameraIso', target=manageAlphaCameraIso, args=(gcsWrites2Sony,programVar,mp_choice, mp_state,))
  327. elif (mp_choice.value == mavlinkSonyCamWriteVals.FUNC_SS):
  328. print("\033[31m SS \033[0m")
  329. p1 = multiprocessing.Process(name='manageAlphaCameraShutSpd', target=manageAlphaCameraShutSpd, args=(gcsWrites2Sony,programVar,mp_choice, mp_state,))
  330. elif (mp_choice.value == mavlinkSonyCamWriteVals.FUNC_WB):
  331. print("\033[31m WB \033[0m")
  332. p1 = multiprocessing.Process(name='manageAlphaWhiteBala', target=manageAlphaWhiteBala, args=(gcsWrites2Sony,programVar,mp_choice, mp_state,))
  333. elif (mp_choice.value == mavlinkSonyCamWriteVals.FUNC_SC):
  334. print("\033[31m SC \033[0m")
  335. p1 = multiprocessing.Process(name='manageAlphaCameraStillCap', target=manageAlphaCameraStillCap, args=(gcsWrites2Sony,programVar,mp_choice, mp_state,))
  336.  
  337. if ((mp_state.value == mavlinkSonyCamWriteVals.STATE_READY) or (mp_state.value == mavlinkSonyCamWriteVals.STATE_INIT)):
  338. print("\033[33m p1 is re-started \033[0m")
  339. with mp_state.get_lock():
  340. mp_state.value == mavlinkSonyCamWriteVals.STATE_CAM_WRITING
  341. p1.start()
  342. except Exception as err_msg:
  343. print("error message {}".format(err_msg))
  344. else:
  345. print("\033[35m p1 is still running \033[0m")
  346. else:
  347. if (mp_choice.value == mavlinkSonyCamWriteVals.FUNC_EX_PRO):
  348. p1 = multiprocessing.Process(name='manageAlphaCameraExpro', target=manageAlphaCameraExpro, args=(gcsWrites2Sony,programVar,mp_choice, mp_state,))
  349. elif (mp_choice.value == mavlinkSonyCamWriteVals.FUNC_APER):
  350. p1 = multiprocessing.Process(name='manageAlphaCameraAperture', target=manageAlphaCameraAperture, args=(gcsWrites2Sony,programVar,mp_choice, mp_state,))
  351. elif (mp_choice.value == mavlinkSonyCamWriteVals.FUNC_FOCUS):
  352. p1 = multiprocessing.Process(name='manageAlphaCameraFocusData', target=manageAlphaCameraFocusData, args=(gcsWrites2Sony,programVar,mp_choice, mp_state,))
  353. elif (mp_choice.value == mavlinkSonyCamWriteVals.FUNC_ISO):
  354. p1 = multiprocessing.Process(name='manageAlphaCameraIso', target=manageAlphaCameraIso, args=(gcsWrites2Sony,programVar,mp_choice, mp_state,))
  355. elif (mp_choice.value == mavlinkSonyCamWriteVals.FUNC_SS):
  356. p1 = multiprocessing.Process(name='manageAlphaCameraShutSpd', target=manageAlphaCameraShutSpd, args=(gcsWrites2Sony,programVar,mp_choice, mp_state,))
  357. elif (mp_choice.value == mavlinkSonyCamWriteVals.FUNC_WB):
  358. p1 = multiprocessing.Process(name='manageAlphaWhiteBala', target=manageAlphaWhiteBala, args=(gcsWrites2Sony,programVar,mp_choice, mp_state,))
  359. elif (mp_choice.value == mavlinkSonyCamWriteVals.FUNC_SC):
  360. p1 = multiprocessing.Process(name='manageAlphaCameraStillCap', target=manageAlphaCameraStillCap, args=(gcsWrites2Sony,programVar,mp_choice, mp_state,))
  361.  
  362. time.sleep(0.1)
  363. if (mp_state.value == mavlinkSonyCamWriteVals.STATE_READY):
  364. print("now we are waiting for the multiprocess we started")
  365. try:
  366. p1.join()
  367. p1.terminate()
  368. print("p1 is ended")
  369. except Exception as err_msg:
  370. print("error message {}".format(err_msg))
  371.  
  372. #
  373. # ===================== Paralell Pool Scheduler ==========================
  374. #
  375. # runs each task in a paralel waiting for all to finish
  376. #
  377.  
  378. print("\033[31m;42m Always waits for all proceses before restart loop \033[0m")
  379. pool = multiprocessing.Pool(max_number_processes) #Defines the Batch Size
  380.  
  381. pool.apply_async(sendMavExpro,args=(wedge,))
  382. pool.apply_async(sendMavAper,args=(wedge,))
  383. pool.apply_async(sendMavFocusData,args=(wedge,))
  384. pool.apply_async(sendMavIso,args=(wedge,))
  385. pool.apply_async(sendMavShutSpd,args=(wedge,))
  386. pool.apply_async(sendMavWhiteBala,args=(wedge,))
  387. pool.apply_async(sendMavStillCap,args=(wedge,))
  388.  
  389. pool.close() # After all threads started we close the pool
  390. pool.join() # And wait until all threads are done
  391. print("\033[34;42m done function in-paralel \033[0m")
  392.  
  393.  
  394.  
Add Comment
Please, Sign In to add comment