Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import java.io.IOException;
- import java.util.*;
- import java.util.concurrent.atomic.AtomicBoolean;
- abstract class Task {
- abstract String taskId();
- abstract void doWork() throws IOException;
- }
- class SchedulerWithQueue {
- private final Queue<Task> taskQueue = new LinkedList<>();
- private final Set<Task> finishedTasks = new HashSet<>();
- private final Set<Task> runningTasks = new HashSet<>();
- private final AtomicBoolean failureDetected = new AtomicBoolean(false);
- private final Object lock = new Object();
- // Start execution
- public void execute() throws InterruptedException {
- synchronized (lock) {
- Collection<Task> initialTasks = nextTasks(List.of());
- if (initialTasks.isEmpty()) {
- return; // No tasks to execute
- }
- taskQueue.addAll(initialTasks);
- lock.notifyAll(); // Notify waiting threads
- }
- // Scheduler thread
- while (true) {
- Task task;
- // Fetch a task from the queue
- synchronized (lock) {
- while (taskQueue.isEmpty() && !shouldStop()) {
- lock.wait(); // Wait for new tasks or stop condition
- }
- if (shouldStop()) {
- break; // Exit loop if no more tasks or failure
- }
- task = taskQueue.poll();
- runningTasks.add(task);
- }
- // Run the task in a new thread
- Runnable runnable = createCustomRunnable(task);
- new Thread(runnable).start();
- }
- }
- // Determine when to stop the scheduler
- private synchronized boolean shouldStop() {
- return (taskQueue.isEmpty() && runningTasks.isEmpty() && nextTasks(finishedTasks).isEmpty())
- || failureDetected.get();
- }
- // Create a runnable to execute a task
- private Runnable createCustomRunnable(Task task) {
- return () -> {
- try {
- if (!failureDetected.get()) {
- task.doWork();
- }
- onTaskComplete(task);
- } catch (Exception e) {
- failureDetected.set(true); // Mark failure
- } finally {
- synchronized (lock) {
- lock.notifyAll(); // Notify scheduler of task completion
- }
- }
- };
- }
- // Handle task completion
- private void onTaskComplete(Task task) {
- synchronized (lock) {
- runningTasks.remove(task);
- finishedTasks.add(task);
- if (!failureDetected.get()) {
- // Fetch new tasks if no failure
- Collection<Task> dependentTasks = nextTasks(finishedTasks);
- for (Task dependentTask : dependentTasks) {
- if (!runningTasks.contains(dependentTask) && !finishedTasks.contains(dependentTask)) {
- taskQueue.add(dependentTask);
- }
- }
- lock.notifyAll(); // Notify scheduler of new tasks
- }
- }
- }
- // Simulated API for fetching dependent tasks
- private Collection<Task> nextTasks(Collection<Task> completedTasks) {
- // Assume this method is implemented
- return Collections.emptyList();
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement