Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- local function collect_results(total_combinations)
- print("Master Node: Waiting for results...")
- modem.open(master_port)
- -- Table to track active ranges by worker
- local active_ranges = {}
- local message_queue = {}
- -- Listener loop to enqueue messages
- local listener_thread = coroutine.create(function()
- while true do
- local _, _, from, port, _, message, data = event.pull("modem_message")
- table.insert(message_queue, {from = from, port = port, message = message, data = data})
- end
- end)
- local processor_thread = coroutine.create(function()
- while true do
- if #message_queue > 0 then
- local next_message = table.remove(message_queue, 1)
- local from = next_message.from
- local port = next_message.port
- local message = next_message.message
- local data = next_message.data
- if message == "result" then
- local result = serialization.unserialize(data)
- print("Results received from worker:", result.worker_id)
- -- Save valid addresses
- for _, address in ipairs(result.valid_addresses) do
- save_address(address, result.gate_type or "MILKYWAY")
- progress.gates_found = progress.gates_found + 1
- end
- -- Update progress
- local processed_count = result.range.finish - result.range.start + 1
- progress.total_scanned = (progress.total_scanned or 0) + processed_count
- addresses_validated = (addresses_validated or 0) + processed_count -- Track validated addresses
- -- Mark range as completed
- active_ranges[result.worker_id] = nil
- -- Call ETA update after processing worker result
- update_eta(total_combinations)
- save_progress(progress) -- Save current progress state
- ready_workers[result.worker_id].is_busy = false
- -- Mark worker as ready
- ready_workers[from] = { name = result.worker_id, port = result.worker_port, mac = result.mac, address = from }
- assign_tasks_to_ready_workers()
- elseif message == "error" then
- -- Handle error messages
- local worker_address = from
- local error_data = serialization.unserialize(data)
- print("Error reported by worker:", error_data.worker_id, "| Reason:", error_data.error)
- -- Save any valid addresses sent before the error occurred
- for _, address in ipairs(error_data.valid_addresses) do
- save_address(address, error_data.gate_type or "MILKYWAY")
- progress.gates_found = progress.gates_found + 1
- end
- if error_data.processed_ranges then
- local processed_range = error_data.processed_ranges -- This holds {start, finish} of the completed range
- -- Calculate the processed count and update progress
- local processed_count = processed_range.finish - processed_range.start + 1
- progress.total_scanned = (progress.total_scanned or 0) + processed_count
- addresses_validated = (addresses_validated or 0) + processed_count
- -- Define the remaining range
- local remaining_range = {
- start = processed_range.finish + 1, -- Continue from the last processed point
- finish = pending_ranges[1] and pending_ranges[1].finish or processed_range.finish + batch_size
- }
- print("Reassigning remaining range to queue:", remaining_range.start, "-", remaining_range.finish)
- table.insert(pending_ranges, remaining_range) -- Add to pending queue
- else
- print("Warning: No valid processed ranges received from worker:", error_data.worker_id)
- end
- save_progress(progress)
- -- Send shutdown to the worker
- print("Sending shutdown command to worker:", error_data.worker_id)
- modem.send(worker_address, error_data.port, "shutdown")
- elseif message == "request_name_port" then
- local result = serialization.unserialize(data)
- local modem_id = result.modemid
- local worker_id = result.id
- local mac_id = result.id
- print("Name and port request received from:", worker_id, "Address:", modem_id)
- if worker_id then
- handle_worker_name_request(worker_id, modem_id, mac_id)
- else
- print("Error: Received nil worker_id in request_name_port.")
- end
- elseif message == "ready" then
- -- Worker sends "ready" signal
- handle_ready_message(from, data)
- elseif message == "goodbye" then
- -- Worker sends "goodbye" signal
- handle_goodbye_message(from, data)
- end
- end
- end)
- end
- -- Run both threads
- coroutine.resume(listener_thread)
- coroutine.resume(processor_thread)
- end
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement