Advertisement
Kaelygon

gmpy2 multithreaded

Oct 29th, 2024 (edited)
88
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 9.56 KB | None | 0 0
  1. import ctypes
  2. import multiprocessing
  3. import threading
  4. import time
  5. import gmpy2
  6. import sys
  7. import os
  8.  
  9. CHECK_FOR_TWIN_PRIMES = True
  10.  
  11. class Coefficients:
  12.     def __init__(self, a, b, n, c):
  13.         self.a = a
  14.         self.b = b
  15.         self.n = n
  16.         self.c = c
  17.  
  18. class MainConfig:
  19.     def __init__(self, argv):
  20.         self.args = argv
  21.         self.programName = "None"
  22.         self.doClearFile = False
  23.         self.nMaxCount = 1024
  24.        
  25. def writeToFile(file,inStr,type="w"):
  26.     with open(file, type) as f:  # Write first unsolved to progress file
  27.         f.write(str(inStr))
  28.                
  29. def listenKeyQuit(killSignal, keySignal):
  30.     import select, sys
  31.     while not killSignal.value:
  32.         # Use select to check for input availability without blocking
  33.         if select.select([sys.stdin], [], [], 0.1)[0]:
  34.             user_input = sys.stdin.readline().strip()
  35.             if user_input == 'q':
  36.                 print("Quit received! Waiting for threads.")
  37.                 keySignal.value = True
  38.                 break
  39.  
  40.  
  41. def findPrimeThread(coef, threadStorage, workPool, killSignal):
  42.     while True:
  43.         workPool[2] = True  # Indicate to manager that work is needed
  44.         while workPool[2] == True:
  45.             if killSignal.value:
  46.                 return
  47.             time.sleep(0.01)  # Yield until manager assigns work
  48.  
  49.         # Process assigned work range
  50.         for a in range(*coef.a):
  51.             for c in range(*coef.c):
  52.                 if (not (a&1)) and (not (c&1)): # Early continue if result is always even
  53.                     continue
  54.                 for b in range(*coef.b):
  55.                     for n in range(workPool[0], workPool[1], coef.n[2]): #Use range from workPool given by workManager
  56.                         resultValue = a * (b ** n) + c
  57.                         if gmpy2.is_prime(resultValue):
  58.                             threadStorage.put((a, b, n, c, False))  # Append prime info to threadStorage
  59.                             if CHECK_FOR_TWIN_PRIMES:
  60.                                 for i in range(-2,3,4,):
  61.                                     if gmpy2.is_prime(resultValue+i):
  62.                                         threadStorage.put((a, b, n, c+i, True))
  63.  
  64.  
  65. def queueToFormulas(storageBuffer, bigFormula, nBig):
  66.     outString = ""
  67.     while not storageBuffer.empty():
  68.         (a, b, n, c, isTwin) = storageBuffer.get()
  69.         formula = f"{b}**{n}"
  70.         if a != 1:  # Ignore 1*
  71.             formula = f"{a}*" + formula
  72.         if c >= 0:  # only add '+' sign to positive c
  73.             formula += f"+"
  74.         formula += f"{c}"  # +c
  75.         outString += f"{formula}"
  76.         if isTwin:
  77.             outString+=" T"
  78.         outString+="\n"
  79.         if n > nBig:  # store biggest n
  80.             nBig = n
  81.             bigFormula = formula
  82.     return (outString, bigFormula, nBig)
  83.  
  84.  
  85. def stopWorkers(workPools, threadCount=1, updateRate=0.01):
  86.     allDone = False
  87.     while allDone == False:  # wait till threads have finished their last tasks
  88.         allDone = True
  89.         for tid in range(threadCount):
  90.             if workPools[tid][2] == False:
  91.                 allDone = False
  92.         time.sleep(updateRate)
  93.     return True
  94.    
  95. def getConsoleUpdate(coef,finishedCount):
  96.     numerator = finishedCount - coef.n[0]
  97.     denominator = max(0, coef.n[1] - coef.n[0] + 1)
  98.     separator = " | "
  99.     printString = ""
  100.     printString+= f"{coef.n[0]} to {coef.n[1]}"
  101.     printString+= separator
  102.     printString+= f"progress={numerator}/{denominator}"
  103.     printString+= separator
  104.     printString+= f"{100.0 * numerator / (denominator):.2f}%"
  105.     return printString
  106.  
  107. def workManagerThread(coef, batchSize, workPools, threadStorage, threadCount, killSignal, keySignal, progressFile, outputFile):
  108.     nLast = coef.n[0]  # first unsolved value of n
  109.     oldN = -1  # used to compare n changes
  110.     nBig = 0  # console print biggest n
  111.     updateRate = 0.05  # time in seconds between check and file writes
  112.     printRate = 2.0  # time between console prints in seconds
  113.  
  114.     startTime = 0
  115.     printTime = 0
  116.     endTime = 1000
  117.     finishedCount = 0
  118.     bigFormula = ""
  119.     while not killSignal.value:
  120.         if keySignal.value == True:
  121.             killSignal.value = stopWorkers(workPools, threadCount, updateRate)
  122.  
  123.         # Update progress tracking
  124.         if nLast > oldN:
  125.             oldN = nLast
  126.             # subtract the n values that are being worked on. Accurate unless a thread is more than 2 or more batches behind
  127.             finishedCount = nLast - batchSize * threadCount
  128.             finishedCount = max(finishedCount, coef.n[0]) #ensure the corrective offset doesn't result lower count than we started
  129.             writeToFile(progressFile,finishedCount)
  130.  
  131.             if nLast > coef.n[1]:
  132.                 killSignal.value = stopWorkers(workPools, threadCount, updateRate)
  133.  
  134.         # Assign work to available processes
  135.         for tid in range(threadCount):
  136.             if workPools[tid][2] == True:  # Access doneFlag using indexing
  137.                 storageBuffer = threadStorage[tid]
  138.  
  139.                 # Assign new work
  140.                 if killSignal.value == False:
  141.                     workPools[tid][0] = nLast  # workPools[tid].start
  142.                     workPools[tid][1] = nLast + batchSize*coef.n[2]  # workPools[tid].end
  143.                     workPools[tid][2] = False  # workPools[tid].doneFlag
  144.                     nLast += batchSize
  145.  
  146.                 # Process results from workers
  147.                 (outString, bigFormula, nBig) = queueToFormulas(storageBuffer, bigFormula, nBig)
  148.  
  149.                 if outString:  # print result strings to file
  150.                     writeToFile(outputFile,outString,"a")
  151.  
  152.             if nLast > coef.n[1] and killSignal.value == False:  # stop passing new tasks after max
  153.                 break
  154.  
  155.         # Sleep until the next update
  156.         endTime = time.time()
  157.         time.sleep(max(0, updateRate - (endTime - startTime)))
  158.         startTime = endTime
  159.  
  160.         # console update
  161.         if (endTime - printTime) > printRate:
  162.             printString = getConsoleUpdate(coef,finishedCount)
  163.             if (bigFormula != ""):
  164.                 printString += f" | high prime={bigFormula}"
  165.             print(printString)
  166.             printTime = endTime
  167.  
  168.     return nLast  # last unsolved n
  169.                
  170. def manPagePrintUsage(cfg):
  171.     print("Much of this script is yet to be done")
  172.     print("Search primes at form a*b^n+c")
  173.     print("Output file: primeForms.txt")
  174.     print("Input file: nProgress.txt")
  175.     print(f"Usage:\n")
  176.     print(f"python {cfg.args[0]}.py -? -h --help")
  177.     print(f"python {cfg.args[0]}.py [largest n]")
  178.    
  179.  
  180. def manPage(cfg):
  181.     if len(cfg.args) > 1:
  182.         try: # Read last progress from file if available
  183.             cfg.nMaxCount = int(cfg.args[1].strip())
  184.             return 0
  185.         except (FileNotFoundError, ValueError):
  186.             cfg.nMaxCount = 1
  187.  
  188.         manPagePrintUsage(cfg)
  189.         return 1
  190.    
  191.  
  192. def main():
  193.     cfg = MainConfig(sys.argv)
  194.     if manPage(cfg):
  195.         return 0
  196.  
  197.     progressFile = "nProgress.txt"
  198.     outputFile = "primeForms.txt"
  199.    
  200.     #write header
  201.     if os.stat(outputFile).st_size == 0 or cfg.doClearFile==True:
  202.         writeToFile(outputFile,"Prime a*b^n+c\n")
  203.         writeToFile(progressFile,int(1))
  204.  
  205.     nStart = 1
  206.     batchSize = 1
  207.    
  208.     try: # Check if outputFile is empty or exists
  209.         with open(outputFile, "r") as f:
  210.             err = (f.read().strip())
  211.     except (FileNotFoundError):
  212.         with open(outputFile, "w") as f:
  213.             f.write("Prime a*b^n+c\n")
  214.  
  215.     # Try to resume from last saved progress
  216.     try:
  217.         with open(progressFile, "r") as f:
  218.             nStart = int(f.read().strip())
  219.     except (FileNotFoundError, ValueError):
  220.         nStart = 1  # Start from scratch if no progress file
  221.        
  222.     nMax = nStart + cfg.nMaxCount - 1
  223.  
  224.     aList = [9,10,1]
  225.     bList = [13238717,13238718,1]
  226.     nList = [nStart,nMax,1]
  227.     cList = [-2, 3,4]
  228.    
  229.     threadCount = os.cpu_count()
  230.     threadCount = min(threadCount,nMax-nStart+1)
  231.    
  232.     coef = Coefficients(aList, bList, nList, cList)
  233.  
  234.     # Create a Queue to store results
  235.     threadStorage = [multiprocessing.Queue() for _ in range(threadCount)]
  236.    
  237.     # Initialize shared data structures
  238.     workPools = [multiprocessing.Array(ctypes.c_int, 3) for _ in range(threadCount)]  # (nStart, nEnd, doneFlag)
  239.  
  240.     for wp in workPools:
  241.         wp[0] = 0  # start
  242.         wp[1] = 0  # end
  243.         wp[2] = False  # doneFlag
  244.        
  245.     killSignal = multiprocessing.Value(ctypes.c_bool, False)
  246.     keySignal = multiprocessing.Value(ctypes.c_bool, False)
  247.  
  248.     listener_thread = threading.Thread(target=listenKeyQuit, args=(killSignal, keySignal,))
  249.     listener_thread.start()
  250.  
  251.  
  252.     # Launch worker processes
  253.     processes = []
  254.     for tid in range(threadCount):
  255.         p = multiprocessing.Process(target=findPrimeThread, args=(coef, threadStorage[tid], workPools[tid], killSignal))
  256.         p.start()
  257.         processes.append(p)
  258.  
  259.     # Run work manager thread
  260.     nLast = workManagerThread(coef, batchSize, workPools, threadStorage, threadCount, killSignal, keySignal, progressFile, outputFile)
  261.  
  262.     killSignal.value = True  # This will signal listener_thread to stop
  263.  
  264.     # Wait for all processes to complete
  265.     for p in processes:
  266.         p.join()
  267.  
  268.     writeToFile(progressFile,nLast)
  269.  
  270.     print(getConsoleUpdate(coef,nLast))
  271.     print("All threads done.")
  272.     sys.exit()
  273.    
  274.     listener_thread.join()   # Wait for listener_thread to cleanly exit
  275.  
  276.  
  277. if __name__ == "__main__":
  278.     main()
  279.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement