Advertisement
samuelask

Stargate Finder Server

Dec 17th, 2024 (edited)
107
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Lua 29.44 KB | None | 0 0
  1. local component = require("component")
  2. local event = require("event")
  3. local serialization = require("serialization")
  4. local modem = component.modem
  5. local fs = require("filesystem")
  6. local sg = component.stargate
  7. local term = require("term")
  8. local thread = require("thread")
  9. -- Configuration
  10. local save_file = "/valid_addresses.txt"
  11. local progress_file = "/progress.json"
  12. local valid_addresses = {}
  13. local progress = {}
  14. local ready_workers = {} -- Tracks workers that are ready
  15. local total_combinations = 0
  16. total_scanned = 0
  17. gates_found = 0
  18. local addresses_validated = 0        -- Tracks addresses validated since last update
  19. local last_update_time = os.time()  -- Initialize with real-world time in seconds
  20. local addresses_per_minute = 0       -- Calculated addresses per minute
  21. local time_remaining = 0             -- Remaining time estimate (in seconds)
  22. local batch_size = 1500 -- Addresses per task
  23. local pending_ranges = {} -- Table to store unassigned ranges
  24. local worker_names_file = "/worker_names.json"
  25. local worker_names = {}
  26. local next_worker_id = 1
  27. local next_port = 2000  -- Starting port number
  28. local master_port = 1234
  29. local message_queue = {}
  30. local message_queue2 = {}
  31. local active_ranges = {}
  32.  
  33. address_length = 6
  34.  
  35. -- Allowed message types
  36. local allowed_messages = {
  37.     ["result"] = true,
  38.     ["error"] = true,
  39.     ["request_name_port"] = true,
  40.     ["ready"] = true,
  41.     ["goodbye"] = true
  42. }
  43.  
  44. -- Range Tracking
  45. local current_index = 1
  46.  
  47. local gate_types = {
  48.     MILKYWAY = {
  49.         symbols = {
  50.             "Crater", "Virgo", "Bootes", "Centaurus", "Libra", "Serpens Caput", "Norma",
  51.             "Scorpius", "Corona Australis", "Scutum", "Sagittarius", "Microscopium", "Capricornus", "Piscis Austrinus",
  52.             "Equuleus", "Aquarius", "Pegasus", "Sculptor", "Pisces", "Andromeda", "Triangulum",
  53.             "Aries", "Perseus", "Cetus", "Taurus", "Auriga", "Eridanus", "Orion",
  54.             "Canis Minor", "Monoceros", "Gemini", "Hydra", "Lynx", "Cancer", "Sextans",
  55.             "Leo Minor", "Leo"
  56.         },
  57.         point_of_origin = "Point of Origin"
  58.     },
  59.     PEGASUS = {
  60.         symbols = {
  61.             "Ca Po", "Laylox", "Ecrumig", "Avoniv", "Bydo", "Aaxel", "Aldeni",
  62.             "Setas", "Arami", "Danami", "Baselai", "Sandovi", "Illume", "Amiwill", "Sibbron",
  63.             "Gilltin", "Roehmi", "Ramnon", "Olavii", "Hacemill", "Poco Re", "Abrin",
  64.             "Salma", "Hamlinto", "Elenami", "Tahnan", "Zeo", "Once El", "Robandus", "Recktic",
  65.             "Zamilloz", "Dawnre", "Acjesis", "Lenchan", "Alura"
  66.         },
  67.         point_of_origin = "Subido"
  68.     },
  69.     UNIVERSE = {
  70.         symbols = {
  71.             "G1", "G2", "G3", "G4",
  72.             "G5", "G6", "G7", "G8",
  73.             "G9", "G10", "G11", "G12", "G13",
  74.             "G14", "G15", "G16",
  75.             "G18", "G19", "G20", "G21",
  76.             "G22", "G23", "G24", "G25", "G26",
  77.             "G27", "G28", "G29", "G30",
  78.             "G31", "G32", "G33", "G34",
  79.             "G35", "G36"
  80.         },
  81.         point_of_origin = "G17"
  82.     }
  83. }
  84. local function get_gate_type()
  85.     local gateType = sg.getGateType()
  86.     if gateType then
  87.         return gateType
  88.     else
  89.         return "MILKYWAY"  -- Default to MILKYWAY if unknown
  90.     end
  91. end
  92. -- Add a message to the queue
  93. local function enqueue_message(message)
  94.     table.insert(message_queue, message)
  95. end
  96. -- Add a message to the queue
  97. local function enqueue_message2(message)
  98.     table.insert(message_queue2, message)
  99. end
  100. -- Retrieve and remove the oldest message from the queue
  101. local function dequeue_message()
  102.     if #message_queue > 0 then
  103.         return table.remove(message_queue, 1)
  104.     end
  105.     return nil
  106. end
  107. local function dequeue_message2()
  108.     if #message_queue > 0 then
  109.         return table.remove(message_queue2, 1)
  110.     end
  111.     return nil
  112. end
  113. -- Dynamic configuration based on gate type
  114. local gate_type = get_gate_type()
  115. local symbols = gate_types[gate_type] and gate_types[gate_type].symbols or gate_types.MILKYWAY.symbols
  116. local point_of_origin = gate_types[gate_type] and gate_types[gate_type].point_of_origin or gate_types.MILKYWAY.point_of_origin
  117.  
  118. if not symbols or #symbols == 0 then
  119.     error("Symbols for gate type " .. gate_type .. " are not defined or empty!")
  120. end
  121.  
  122. if not point_of_origin then
  123.     error("Point of Origin for gate type " .. gate_type .. " is not defined!")
  124. end
  125.  
  126. if gate_types[gate_type] then
  127.     print("Detected Gate Type:", gate_type)
  128.     symbols = gate_types[gate_type].symbols
  129.     point_of_origin = gate_types[gate_type].point_of_origin
  130. else
  131.     print("Unknown Gate Type:", gate_type, "Defaulting to MILKYWAY configuration.")
  132.     gate_type = "MILKYWAY"
  133.     symbols = gate_types.MILKYWAY.symbols
  134.     point_of_origin = gate_types.MILKYWAY.point_of_origin
  135. end
  136.  
  137. local function calculate_total_combinations(symbol_count, address_length)
  138.     if address_length > symbol_count then
  139.         return 0  -- Invalid case
  140.     end
  141.  
  142.     local total = 1
  143.     for i = symbol_count, symbol_count - address_length + 1, -1 do
  144.         total = total * i
  145.     end
  146.     return total
  147. end
  148. -- Function to update ETA and addresses per minute
  149. local function update_eta(total_combinations)
  150.     if not total_combinations or total_combinations <= 0 then
  151.         print("Error: update_eta received an invalid total_combinations value:", total_combinations)
  152.         return
  153.     end
  154.    
  155.     local current_time = os.time()  -- Get current real-world time
  156.     local elapsed_time = current_time - last_update_time
  157.  
  158.  
  159.     -- Update addresses per minute and remaining time every 5 seconds
  160.     if elapsed_time >= 5 then
  161.         addresses_per_minute = (addresses_validated / elapsed_time) * 60  -- Scale to per minute
  162.         addresses_validated = 0  -- Reset counter
  163.         last_update_time = current_time
  164.  
  165.         -- Estimate remaining time
  166.         local remaining_addresses = total_combinations - total_scanned
  167.         time_remaining = addresses_per_minute > 0 and (remaining_addresses / addresses_per_minute) * 60 or 0
  168.     end
  169. end
  170. -- Function to load saved progress for the current gate type
  171. local function load_progress()
  172.     if fs.exists(progress_file) then
  173.         local file = io.open(progress_file, "r")
  174.         local all_progress = serialization.unserialize(file:read("*all"))
  175.        
  176.  
  177.         file:close()
  178.         if all_progress and all_progress[gate_type] then
  179.             print("Progress loaded for gate type:", gate_type)
  180.             return all_progress[gate_type]
  181.         else
  182.             print("No progress found for gate type:", gate_type, "Initializing fresh progress.")
  183.         end
  184.     else
  185.         print("No progress file found. Initializing fresh progress...")
  186.     end
  187.     -- Return default state if no progress is found
  188.     return { total_scanned = 0, gates_found = 0 }
  189. end
  190.  
  191. -- Function to save progress savedata for the current gate type
  192. local function save_progress(savedata)
  193.     local all_progress = {}
  194.  
  195.     -- Load existing progress file if it exists
  196.     if fs.exists(progress_file) then
  197.         local file = io.open(progress_file, "r")
  198.         all_progress = serialization.unserialize(file:read("*all")) or {}
  199.         file:close()
  200.     end
  201.  
  202.     -- Update progress for the current gate type
  203.     all_progress[gate_type] = savedata
  204.  
  205.     -- Save updated progress back to file
  206.     local file = io.open(progress_file, "w")
  207.     if file then
  208.         file:write(serialization.serialize(all_progress))
  209.         file:close()
  210.        
  211.         -- Print below the progress bar
  212.        --  local screen_width, screen_height = term.getViewport()
  213.        --  local middle_line = math.floor(screen_height / 2)
  214.        
  215.         -- Initialize next_message_line if not already done
  216.       --   if not next_message_line then
  217.       --       next_message_line = middle_line + 2  -- Start 2 lines below the progress bar
  218.       --   end
  219.        
  220.         -- Move cursor below the ETA line
  221.       --   term.setCursor(1, middle_line + 2)  -- ETA is on `middle_line + 1`, so we move to `+2`
  222.        --  term.clearLine()
  223.        -- io.write(string.format("Progress saved for gate type: %s | Scanned: %d | Gates found: %d",
  224.         --    gate_type, savedata.total_scanned, savedata.gates_found))
  225.         print("Progress saved for gate type:", gate_type, "Scanned:", total_scanned, "Gates found:", gates_found)
  226.     --  print("Progress saved for gate type: %s | Scanned: %d | Gates found: %d",
  227.      --       gate_type, savedata.total_scanned, savedata.gates_found)
  228.         -- Update the next_message_line for subsequent messages
  229.        --  next_message_line = middle_line + 3
  230.     else
  231.         -- Print error message below the ETA line
  232.        --  term.setCursor(1, middle_line + 2) -- Move below the ETA line
  233.        --  term.clearLine()
  234.       --  io.write("Error: Failed to save progress.")
  235.             print("Error: Failed to save progress.")
  236.         -- next_message_line = middle_line + 3
  237.     end
  238. end
  239.  
  240. -- Save valid address to file
  241. local function save_address(address, gate_type)
  242.     local file = io.open(save_file, "a")
  243.     if file then
  244.         file:write("Valid address: " .. table.concat(address, ", ") .. " | Gate Type: " .. gate_type .. "\n")
  245.         file:close()
  246.        
  247.         -- Print below the progress bar
  248.         local screen_width, screen_height = term.getViewport()
  249.         local middle_line = math.floor(screen_height / 2)
  250.        
  251.         -- Set the next line for the message (below progress bar)
  252.         if not next_message_line then
  253.             next_message_line = middle_line + 2  -- Start 2 lines below the progress bar
  254.         end
  255.         -- Move cursor to the message line and print the message
  256.         term.setCursor(1, next_message_line)
  257.         term.clearLine()
  258.         print("Saved valid address:", table.concat(address, ", "), "| Gate Type:", gate_type)
  259.  
  260.         -- Increment the message line for the next print
  261.         next_message_line = next_message_line + 1
  262.  
  263.         -- If we reach the bottom of the screen, reset the message line
  264.         if next_message_line > screen_height then
  265.             next_message_line = middle_line + 2
  266.         end
  267.     else
  268.        
  269.         -- Print below the progress bar
  270.         local screen_width, screen_height = term.getViewport()
  271.         local middle_line = math.floor(screen_height / 2)
  272.        
  273.         -- Set the next line for the message (below progress bar)
  274.         if not next_message_line then
  275.             next_message_line = middle_line + 2  -- Start 2 lines below the progress bar
  276.         end
  277.         -- Move cursor to the message line and print the message
  278.         term.setCursor(1, next_message_line)
  279.         term.clearLine()
  280.         print("Error: Could not save valid address.")
  281.  
  282.         -- Increment the message line for the next print
  283.         next_message_line = next_message_line + 1
  284.  
  285.         -- If we reach the bottom of the screen, reset the message line
  286.         if next_message_line > screen_height then
  287.             next_message_line = middle_line + 2
  288.         end
  289.     end
  290. end
  291.  
  292. local function display_progress(total_combinations)
  293.     -- Safeguard to ensure valid inputs
  294.     if not total_scanned then total_scanned = 0 end
  295.     if not total_combinations or total_combinations <= 0 then
  296.         print("Error: Total combinations is invalid or zero.")
  297.         return
  298.     end
  299.    
  300.     -- Calculate progress percentage
  301.     local percentage = math.min((total_scanned / total_combinations) * 100, 100)
  302.    
  303.     -- Format time as HH:MM:SS
  304.     local hours = math.floor(time_remaining / 3600)
  305.     local minutes = math.floor((time_remaining % 3600) / 60)
  306.     local seconds = math.floor(time_remaining % 60)
  307.    
  308.     -- Use a cached progress bar
  309.     local filled_length = math.floor(percentage / 2)
  310.     local progress_bar = string.rep("#", filled_length) .. string.rep("-", 50 - filled_length)
  311.     local progress_line = string.format("[%-50s] %.2f%% | Scanned: %d / %d",
  312.         progress_bar, percentage, total_scanned, total_combinations)
  313.     local eta_line = string.format("ETA: %02d:%02d:%02d | Addresses per minute: %.2f",
  314.         hours, minutes, seconds, addresses_per_minute)
  315.  
  316.     -- Calculate screen positions
  317.     local screen_width, screen_height = term.getViewport()
  318.     local middle_line = math.floor(screen_height / 2)
  319.    
  320.     -- Draw the progress bar at the middle of the screen
  321.     term.setCursor((screen_width - #progress_line) // 2, middle_line)
  322.     io.write(progress_line)
  323.    
  324.     -- Draw the ETA line directly below the progress bar
  325.     term.setCursor((screen_width - #eta_line) // 2, middle_line + 1)
  326.     io.write(eta_line)
  327. end
  328.  
  329.  
  330.  
  331. -- Load progress specific to the gate type
  332. local state = load_progress()
  333. total_scanned = state.total_scanned or 0
  334. current_index = total_scanned + 1 -- Start from the next unscanned address
  335. gates_found = state.gates_found or 0
  336.  
  337.  
  338. -- Load assigned worker names and ports
  339. local function load_worker_names()
  340.     if fs.exists(worker_names_file) then
  341.         local file = io.open(worker_names_file, "r")                       
  342.         local data = file:read("*all")
  343.         file:close()
  344.         local success, loaded_data = pcall(serialization.unserialize, data)
  345.         if success and loaded_data then
  346.             worker_names = loaded_data
  347.             print("Worker names and ports loaded.")
  348.         else
  349.             print("Error loading worker names. Data may be corrupted.")
  350.         end
  351.     end
  352.    
  353.     -- Dynamically calculate next_worker_id based on existing workers
  354.     next_worker_id = 1
  355.     for _, data in pairs(worker_names) do
  356.         if data.name then
  357.             -- Extract the numeric ID from the worker name (e.g., "Worker 1")
  358.             local id = tonumber(data.name:match("Worker (%d+)"))
  359.             if id and id >= next_worker_id then
  360.                 next_worker_id = id + 1 -- Set next_worker_id to one greater than the highest ID
  361.             end
  362.         end
  363.     end
  364.     print("Initialized next_worker_id to:", next_worker_id)
  365.    
  366.     return worker_names
  367. end
  368.  
  369. -- Save assigned worker names and ports
  370. local function save_worker_names()
  371.     local file = io.open(worker_names_file, "w")
  372.     file:write(serialization.serialize(worker_names))
  373.     file:close()
  374.     print("Worker names and ports saved.")
  375. end
  376. -- Helper function to check if a port is already in use
  377. local function is_port_in_use(worker_names, port)
  378.     for _, worker_data in pairs(worker_names) do
  379.         if worker_data.port == port then
  380.             return true
  381.         end
  382.     end
  383.     return false
  384. end
  385. local function generate_worker_data(worker_names)
  386.     local name, port
  387.  
  388.     -- Loop to generate a unique worker name
  389.     repeat
  390.         name = "Worker " .. next_worker_id
  391.         next_worker_id = next_worker_id + 1
  392.     until not worker_names[name] -- Ensure the name is not already in use
  393.  
  394.     -- Loop to generate a unique worker port
  395.     repeat
  396.         port = next_port
  397.         next_port = next_port + 1
  398.     until not is_port_in_use(worker_names, port) -- Ensure the port is not already in use
  399.  
  400.     return name, port
  401. end
  402. local function generate_next_range()
  403.     if current_index > total_combinations then
  404.         return nil -- All ranges have been scanned
  405.     end
  406.  
  407.     local range_start = current_index
  408.     local range_end = math.min(current_index + batch_size - 1, total_combinations)
  409.     current_index = range_end + 1 -- Move to the next range
  410.  
  411.   --  print("Generated range:", range_start, "-", range_end)
  412.     return {start = range_start, finish = range_end}
  413. end
  414. -- Handle worker name and port requests
  415. local function handle_worker_name_request(worker_id, worker_address, mac_id)   
  416.     if worker_names[worker_id] then
  417.     -- Worker MAC Adress already in system
  418.         local worker_data = worker_names[worker_id]
  419.         -- Worker already has a name, port, and address assigned
  420.         local name = worker_data.name
  421.         local worker_port = worker_data.port
  422.         local stored_adress = worker_names[mac_id] and worker_names[mac_id].address
  423.         local stored_mac = worker_names[mac_id] and worker_names[mac_id].mac
  424.        
  425.         if stored_adress == worker_address and stored_mac == mac_id then
  426.             -- Worker network adress and mac adress checks out
  427.             modem.send(worker_address, master_port, "error_name_port", name, worker_port)
  428.             print("Existing name and port sent to worker:", name, worker_port, worker_id, worker_address)
  429.         elseif stored_address ~= worker_address and stored_mac == mac_id then
  430.             -- Worker network adress DOES NOT check out
  431.             print("Existing name and port sent to worker, new network adress saved")
  432.             modem.send(worker_address, master_port, "error_name_port", name, worker_port)
  433.             worker_names[worker_id] = { name = name, port = worker_port, address = worker_address, mac = mac_id }
  434.             save_worker_names()
  435.         elseif stored_adress == worker_address and stored_mac ~= mac_id then
  436.             -- Worker mac adress DOES NOT check out
  437.             print("Existing name and port sent to worker, new mac adress saved")
  438.             modem.send(worker_address, master_port, "error_name_port", name, worker_port)
  439.             worker_names[worker_id] = { name = name, port = worker_port, address = worker_address, mac = mac_id }
  440.             save_worker_names()
  441.         elseif stored_adress ~= worker_address and stored_mac ~= mac_id then
  442.             -- Worker network adress and mac adress DOES NOT check out
  443.             print("Existing name and port sent to worker, new network adress and mac adress saved")
  444.             modem.send(worker_address, master_port, "error_name_port", name, worker_port)
  445.             worker_names[worker_id] = { name = name, port = worker_port, address = worker_address, mac = mac_id }
  446.             save_worker_names()
  447.         else
  448.             print("Unexpected case encountered")
  449.         end
  450.     else
  451.         -- Assign new name and port
  452.         local new_name, new_port = generate_worker_data(worker_names)
  453.         print("Generated Worker Data:", new_name, new_port)
  454.         worker_names[worker_id] = { name = new_name, port = new_port, address = worker_address, mac = mac_id }
  455.         save_worker_names()
  456.         modem.send(worker_address, master_port, "assigned_name_port", new_name, new_port)
  457.         print("New name and port assigned to worker:", new_name, new_port, worker_id, worker_address)
  458.     end
  459. end
  460.  
  461. if total_scanned > 0 then
  462.     print("Resuming progress for", gate_type, ": Addresses scanned:", total_scanned, "| Gates found:", gates_found)
  463. else
  464.     print("No saved progress found. Starting fresh...")
  465. end
  466.  
  467. -- Start the Master Node
  468. load_progress()
  469. worker_names = load_worker_names()
  470. print("Master Node: Initializing...")
  471.  
  472. -- Symbols and combinations
  473.  
  474. total_combinations = calculate_total_combinations(#symbols, address_length)
  475. -- Function to assign tasks to ready workers
  476. local function assign_tasks_to_ready_workers()
  477.     for worker_id, worker_data in pairs(ready_workers) do
  478.         if worker_data.is_busy then
  479.             print("Skipping busy worker:", worker_data.name)
  480.             goto continue
  481.         end
  482.         -- Check for pending ranges first
  483.         if #pending_ranges > 0 then
  484.             range = table.remove(pending_ranges, 1)
  485.             print("Assigning pending range to worker:", worker_data.name)
  486.         else
  487.             range = generate_next_range()
  488.             if not range then
  489.                 print("All work has been assigned. Waiting for results...")
  490.                 return
  491.             end
  492.         end
  493.  
  494.         -- Check if worker data includes address and port
  495.         if worker_data.address and worker_data.port and worker_data.mac then               
  496.             -- Task data includes worker ID for validation
  497.             local task_data = {
  498.                 range = range,
  499.                 symbols = symbols,
  500.                 gate_type = gate_type,
  501.                 point_of_origin = point_of_origin,
  502.                 worker_id = worker_data.mac,
  503.                 address_length = address_length
  504.             }
  505.             -- Send task to worker's address and port
  506.             local sent = modem.send(worker_data.address, worker_data.port, "task", serialization.serialize(task_data))
  507.            
  508.             if sent then
  509.                 -- Wait for "ok" response or timeout
  510.                 print("Waiting for acknowledgment from", worker_data.name)
  511.                 local timeout = 5  -- Wait 5 seconds for "ok"
  512.                 local response_received = false
  513.                
  514.                 while true do
  515.                     local event_name, localAddress, remoteAddress, received_port, distance, message1, message2 = event.pull(timeout, "modem_message")
  516.                     if not event_name then
  517.                         -- Timeout reached
  518.                         print("Error: No acknowledgment from worker:", worker_data.name, "Assuming worker offline.")
  519.                         ready_workers[worker_id] = nil
  520.                         table.insert(pending_ranges, range) -- Return failed range next time
  521.                         return
  522.                     elseif remoteAddress == worker_data.address and message1 == "ok" then
  523.                         print("Acknowledgment received from worker:", worker_data.name)
  524.                         response_received = true
  525.                         worker_data.is_busy = true -- Mark worker as busy
  526.                         break
  527.                     elseif remoteAddress == worker_data.address and message1 == "reassign_range" then
  528.                         -- Reassign the range sent back by the worker
  529.                         local reassigned_data = serialization.unserialize(message2)
  530.                         if reassigned_data.range then
  531.                             --  print("Reassigning range:", reassigned_data.range.start, "-", reassigned_data.range.finish)
  532.                                 table.insert(pending_ranges, reassigned_data.range) -- Return failed range next time
  533.                         end
  534.                         print("Worker requested reassignment due to ID mismatch:", worker_data.name)
  535.                         ready_workers[worker_id] = nil  -- Remove the worker from the ready list
  536.                         break
  537.                     end
  538.                 end
  539.                 if response_received then
  540.                     print(worker_data.name, "acknowledged task. Marking as busy")
  541.                 end
  542.                 return
  543.             else
  544.                 print("Error: Could not send message")
  545.             end
  546.         else
  547.             print("Error: Worker data not found for ", worker_id)
  548.         end
  549.         ::continue::
  550.     end
  551. end
  552. -- Function to count ready workers
  553. local function count_ready_workers()
  554.     local count = 0
  555.     for _, worker_data in pairs(ready_workers) do
  556.         -- Only count workers that are not busy
  557.         if not worker_data.is_busy then
  558.             count = count + 1
  559.         end
  560.     end
  561.     return count
  562. end
  563. local function count_busy_workers()
  564.     local count = 0
  565.     for _, worker_data in pairs(ready_workers) do
  566.         -- Only count workers that are busy
  567.         if worker_data.is_busy then
  568.             count = count + 1
  569.         end
  570.     end
  571.     return count
  572. end
  573. -- Handle worker "ready" messages
  574. local function handle_ready_message(from, data)
  575.     local worker_data = serialization.unserialize(data)
  576.     local worker_id = worker_data.name  -- Use the worker name (ID) as the index
  577.     local mac = worker_data.mac
  578.     -- Validate incoming data
  579.     if worker_id and worker_data.port and from then
  580.         local stored_adress = worker_names[mac] and worker_names[mac].address
  581.         local stored_mac = worker_names[mac] and worker_names[mac].mac
  582.         if from == stored_adress and mac == stored_mac then
  583.             ready_workers[worker_id] = {
  584.             name = worker_data.name,
  585.             port = worker_data.port,
  586.             address = from, -- Use the correct 'from' as the worker's network address
  587.             mac = worker_data.mac
  588.             }
  589.             print("Worker ready:", worker_data.name, "| Port:", worker_data.port, "| Address:", from)
  590.             ready_workers[worker_id].is_busy = false
  591.         else
  592.             print("Error: Expected net-id:", stored_adress, "Got:", from)
  593.             modem.send(from, worker_data.port, "resync", "")
  594.             return
  595.         end
  596.     else
  597.         print("Error: Incomplete worker data received. Missing fields:", worker_id, worker_data.port, from)
  598.         return
  599.     end
  600.  
  601.     print("Current ready workers:", count_ready_workers())
  602.     print("Current busy workers:", count_busy_workers())
  603.     -- Assign tasks when workers become ready
  604.     assign_tasks_to_ready_workers()
  605. end
  606.  
  607. -- Handle worker "goodbye" messages
  608. local function handle_goodbye_message(from, data)
  609.     local worker_data = serialization.unserialize(data)
  610.     local worker_id = worker_data.name  -- Use the worker name (ID) as the index
  611.     print(worker_data.name, "sent goodbye. Removing from active workers.")
  612.     -- Verify if the sender matches the worker being removed
  613.     if ready_workers[worker_id] and ready_workers[worker_id].address == from then
  614.         ready_workers[worker_id] = nil  -- Remove the worker from the ready list
  615.         print("Worker successfully removed:", worker_data.name)
  616.     else
  617.         print("Warning: Mismatch or worker not found for goodbye message. Address:", from)
  618.     end
  619.  
  620.     print("Current active workers:", count_ready_workers())
  621.     print("Current busy workers:", count_busy_workers())
  622. end
  623. -- Function to shut down all workers
  624. local function shutdownall()
  625.     print("Initiating shutdown for all workers...")
  626.     for worker_id, worker_data in pairs(ready_workers) do
  627.         if worker_data and worker_data.address and worker_data.port then
  628.             print("Sending shutdown command to worker:", worker_data.name)
  629.             modem.open(worker_data.port) -- Open the worker's port
  630.             modem.send(worker_data.address, worker_data.port, "shutdown") -- Send the shutdown command
  631.             modem.close(worker_data.port) -- Close the worker's port
  632.         else
  633.             print("Warning: Missing data for worker:", worker_id)
  634.         end
  635.     end
  636.     print("Shutdown command sent to all workers.")
  637. end
  638. -- Listener to filter and enqueue messages
  639. local function message_listener()
  640.     while true do
  641.         local _, _, from, port, _, message, data = event.pull("modem_message")
  642.         if allowed_messages[message] then
  643.             enqueue_message({from = from, port = port, message = message, data = data})
  644.         elseif message == "ok" or "reassign_range" then
  645.             enqueue_message2({from = from, port = port, message = message, data = data})
  646.         else
  647.             print("Ignored message:", message, "from:", from)
  648.         end
  649.     end
  650. end
  651. -- Collect results from workers
  652. local function collect_results(total_combinations)
  653.     print("Master Node: Waiting for results...")
  654.     modem.open(master_port)
  655.     -- Table to track active ranges by worker
  656.     local active_ranges = {}
  657.         while true do
  658.             local message_data = dequeue_message()
  659.                 if message_data then
  660.                     local from = message_data.from
  661.                     local port = message_data.port
  662.                     local message = message_data.message
  663.                     local data = message_data.data
  664.                    
  665.                     if message == "result" then
  666.                         local result = serialization.unserialize(data)
  667.                         print("Results received from worker:", result.worker_id)
  668.            
  669.                         -- Save valid addresses
  670.                         for _, address in ipairs(result.valid_addresses) do
  671.                             save_address(address, result.gate_type or "MILKYWAY")
  672.                             progress.gates_found = progress.gates_found + 1
  673.                         end
  674.            
  675.                         -- Update progress
  676.                         local processed_count = result.range.finish - result.range.start + 1
  677.                         progress.total_scanned = (progress.total_scanned or 0) + processed_count
  678.                         addresses_validated = (addresses_validated or 0) + processed_count  -- Track validated addresses
  679.            
  680.  
  681.                         -- Mark range as completed
  682.                         active_ranges[result.worker_id] = nil          
  683.            
  684.                         -- Call ETA update after processing worker result
  685.                         update_eta(total_combinations)
  686.            
  687.                         save_progress(progress)  -- Save current progress state
  688.                         if ready_workers[result.worker_id] then
  689.                             ready_workers[result.worker_id].is_busy = false -- Mark worker as ready
  690.                             assign_tasks_to_ready_workers()
  691.                         else
  692.                             ready_workers[result.worker_id] = { name = result.worker_id, port = result.worker_port, mac = result.mac, address = from }
  693.                             ready_workers[result.worker_id].is_busy = false -- Mark worker as ready
  694.                             assign_tasks_to_ready_workers()
  695.                         end
  696.            
  697.                     elseif message == "error" then
  698.                         -- Handle error messages
  699.                         local worker_address = from
  700.                         local error_data = serialization.unserialize(data)
  701.                         print("Error reported by worker:", error_data.worker_id, "| Reason:", error_data.error)
  702.  
  703.                         -- Save any valid addresses sent before the error occurred
  704.                         for _, address in ipairs(error_data.valid_addresses) do
  705.                             save_address(address, error_data.gate_type or "MILKYWAY")
  706.                             progress.gates_found = progress.gates_found + 1
  707.                         end
  708.  
  709.                         if error_data.processed_ranges then
  710.                             local processed_range = error_data.processed_ranges  -- This holds {start, finish} of the completed range
  711.    
  712.                             -- Calculate the processed count and update progress
  713.                             local processed_count = processed_range.finish - processed_range.start + 1
  714.                             progress.total_scanned = (progress.total_scanned or 0) + processed_count
  715.                             addresses_validated = (addresses_validated or 0) + processed_count
  716.    
  717.                             -- Define the remaining range
  718.                             local remaining_range = {
  719.                                 start = processed_range.finish + 1,  -- Continue from the last processed point
  720.                                 finish = pending_ranges[1] and pending_ranges[1].finish or processed_range.finish + batch_size
  721.                             }
  722.    
  723.                             print("Reassigning remaining range to queue:", remaining_range.start, "-", remaining_range.finish)
  724.                             table.insert(pending_ranges, remaining_range)  -- Add to pending queue
  725.                         else
  726.                             print("Warning: No valid processed ranges received from worker:", error_data.worker_id)
  727.                         end
  728.  
  729.                         save_progress(progress)
  730.  
  731.                         -- Send shutdown to the worker
  732.                         print("Sending shutdown command to worker:", error_data.worker_id)
  733.                         modem.send(worker_address, error_data.port, "shutdown")
  734.                     elseif message == "request_name_port" then
  735.                         local result = serialization.unserialize(data)
  736.                         local modem_id = result.modemid
  737.                         local worker_id = result.id
  738.                         local mac_id = result.id
  739.                         print("Name and port request received from:", worker_id, "Address:", modem_id)
  740.                         if worker_id then
  741.                             handle_worker_name_request(worker_id, modem_id, mac_id)
  742.                         else
  743.                         print("Error: Received nil worker_id in request_name_port.")
  744.                         end
  745.                     elseif message == "ready" then
  746.                         -- Worker sends "ready" signal
  747.                         handle_ready_message(from, data)           
  748.                     elseif message == "goodbye" then
  749.                         -- Worker sends "goodbye" signal
  750.                         handle_goodbye_message(from, data)
  751.                     end
  752.                 end
  753.                 os.sleep(0)
  754.             end
  755.         end
  756. thread.create(message_listener)
  757. collect_results(total_combinations)
  758. print("Master Node: Scanning complete. Total gates found:", progress.gates_found)
  759.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement