Advertisement
samuelask

Untitled

Dec 19th, 2024
18
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.59 KB | None | 0 0
  1. local function collect_results(total_combinations)
  2. print("Master Node: Waiting for results...")
  3. modem.open(master_port)
  4. -- Table to track active ranges by worker
  5. local active_ranges = {}
  6. local message_queue = {}
  7.  
  8. -- Listener loop to enqueue messages
  9. local listener_thread = coroutine.create(function()
  10. while true do
  11. local _, _, from, port, _, message, data = event.pull("modem_message")
  12. table.insert(message_queue, {from = from, port = port, message = message, data = data})
  13. end
  14. end)
  15. local processor_thread = coroutine.create(function()
  16. while true do
  17. if #message_queue > 0 then
  18. local next_message = table.remove(message_queue, 1)
  19. local from = next_message.from
  20. local port = next_message.port
  21. local message = next_message.message
  22. local data = next_message.data
  23.  
  24. if message == "result" then
  25. local result = serialization.unserialize(data)
  26. print("Results received from worker:", result.worker_id)
  27.  
  28. -- Save valid addresses
  29. for _, address in ipairs(result.valid_addresses) do
  30. save_address(address, result.gate_type or "MILKYWAY")
  31. progress.gates_found = progress.gates_found + 1
  32. end
  33.  
  34. -- Update progress
  35. local processed_count = result.range.finish - result.range.start + 1
  36. progress.total_scanned = (progress.total_scanned or 0) + processed_count
  37. addresses_validated = (addresses_validated or 0) + processed_count -- Track validated addresses
  38.  
  39.  
  40. -- Mark range as completed
  41. active_ranges[result.worker_id] = nil
  42.  
  43. -- Call ETA update after processing worker result
  44. update_eta(total_combinations)
  45.  
  46. save_progress(progress) -- Save current progress state
  47. ready_workers[result.worker_id].is_busy = false
  48. -- Mark worker as ready
  49. ready_workers[from] = { name = result.worker_id, port = result.worker_port, mac = result.mac, address = from }
  50. assign_tasks_to_ready_workers()
  51.  
  52. elseif message == "error" then
  53. -- Handle error messages
  54. local worker_address = from
  55. local error_data = serialization.unserialize(data)
  56. print("Error reported by worker:", error_data.worker_id, "| Reason:", error_data.error)
  57.  
  58. -- Save any valid addresses sent before the error occurred
  59. for _, address in ipairs(error_data.valid_addresses) do
  60. save_address(address, error_data.gate_type or "MILKYWAY")
  61. progress.gates_found = progress.gates_found + 1
  62. end
  63.  
  64. if error_data.processed_ranges then
  65. local processed_range = error_data.processed_ranges -- This holds {start, finish} of the completed range
  66.  
  67. -- Calculate the processed count and update progress
  68. local processed_count = processed_range.finish - processed_range.start + 1
  69. progress.total_scanned = (progress.total_scanned or 0) + processed_count
  70. addresses_validated = (addresses_validated or 0) + processed_count
  71.  
  72. -- Define the remaining range
  73. local remaining_range = {
  74. start = processed_range.finish + 1, -- Continue from the last processed point
  75. finish = pending_ranges[1] and pending_ranges[1].finish or processed_range.finish + batch_size
  76. }
  77.  
  78. print("Reassigning remaining range to queue:", remaining_range.start, "-", remaining_range.finish)
  79. table.insert(pending_ranges, remaining_range) -- Add to pending queue
  80. else
  81. print("Warning: No valid processed ranges received from worker:", error_data.worker_id)
  82. end
  83.  
  84. save_progress(progress)
  85.  
  86. -- Send shutdown to the worker
  87. print("Sending shutdown command to worker:", error_data.worker_id)
  88. modem.send(worker_address, error_data.port, "shutdown")
  89. elseif message == "request_name_port" then
  90. local result = serialization.unserialize(data)
  91. local modem_id = result.modemid
  92. local worker_id = result.id
  93. local mac_id = result.id
  94. print("Name and port request received from:", worker_id, "Address:", modem_id)
  95. if worker_id then
  96. handle_worker_name_request(worker_id, modem_id, mac_id)
  97. else
  98. print("Error: Received nil worker_id in request_name_port.")
  99. end
  100. elseif message == "ready" then
  101. -- Worker sends "ready" signal
  102. handle_ready_message(from, data)
  103. elseif message == "goodbye" then
  104. -- Worker sends "goodbye" signal
  105. handle_goodbye_message(from, data)
  106. end
  107. end
  108. end)
  109. end
  110. -- Run both threads
  111. coroutine.resume(listener_thread)
  112. coroutine.resume(processor_thread)
  113. end
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement