3
\$\begingroup\$

Code

io.github.coderodde.util.Task.java:

package io.github.coderodde.util;

import java.util.Objects;

/**
 * A simple task class.
 * 
 * @author Rodion "rodde" Efremov
 * @version 1.0.0 ()
 * @since 1.0.0 ()
 */
public class Task {

    private final Runnable runnable;
    private final long waitTimeNanos;
    
    public Task(final Runnable runnable,
                final long waitTimeNanos) {
        
        this.runnable = Objects.requireNonNull(runnable,
                                               "The input runnable is null");
        this.waitTimeNanos = waitTimeNanos;
    }
    
    void run() {
        runnable.run();
    }
    
    long getWaitTimeNanos() {
        return waitTimeNanos;
    }
}

io.github.coderodde.util.TaskScheduler.java:

package io.github.coderodde.util;

import java.time.Duration;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
 * A simple task scheduler.
 * 
 * @author Rodion "rodde" Efremov
 * @version 1.0.0 (Dec 21, 2025)
 * @since 1.0.0 (Dec 21, 2025)
 */
public class TaskScheduler implements AutoCloseable {
    
    private static final long MAXIMUM_WAIT_NANOS = 1_000_000_000L;

    private final Set<DispatcherThread> dispatcherThreadSet = 
            ConcurrentHashMap.newKeySet();
    
    public void schedule(final Task task) {
        final DispatcherThread dispatcherThread = new DispatcherThread(task);
        dispatcherThread.start();
        dispatcherThreadSet.add(dispatcherThread);
    }

    @Override
    public void close() throws Exception {
        for (final DispatcherThread dispatcherThread : dispatcherThreadSet) {
            dispatcherThread.close();
        }
    }
    
    private final class DispatcherThread extends Thread {
        
        private long sleepDurationNanoseconds;
        private final Task task;
        private volatile boolean cancelled = false;
        
        DispatcherThread(final Task task) {
            this.task = task;
            this.sleepDurationNanoseconds = task.getWaitTimeNanos();
        }
        
        void close() {
            cancelled = true;
        } 
        
        public void run() {
            while (true) {
                final long waitNanos = Math.min(MAXIMUM_WAIT_NANOS,
                                                sleepDurationNanoseconds);
                
                if (waitNanos == 0L) {
                    break;
                }
                
                if (cancelled) {
                    return;
                }
                
                sleepDurationNanoseconds -= waitNanos;
                
                try {
                    Thread.sleep(Duration.ofNanos(waitNanos));
                } catch (final Exception ex) {

                }
            }
            
            if (cancelled) {
                return;
            }
            
            task.run();
            dispatcherThreadSet.remove(this);
        }
    }
}

io.github.coderodde.util.TaskScheduler.demo.Demo.java:

package io.github.coderodde.util.demo;

import io.github.coderodde.util.Task;
import io.github.coderodde.util.TaskScheduler;

/**
 * A small task scheduler demo.
 * 
 * @author Rodion "rodde" Efremov
 * @version 1.0.0 (Dec 21, 2025)
 * @since 1.0.0 (Dec 21, 2025)
 */
public class Demo {

    public static void main(String[] args) throws Exception {
        final TaskScheduler scheduler = new TaskScheduler();
         
        final Task task1 = new Task(() -> System.out.println("hello"),
                                    2_000_000_000L);
        
        final Task task2 = new Task(() -> System.out.println("world"), 
                                    4_000_000_000L);
        
        final Task task3 = new Task(() -> 
                System.out.println("You should not see this text."),
                100_000_000_000L);
        
        scheduler.schedule(task1);
        scheduler.schedule(task2);
        scheduler.schedule(task3);
        
        // This sleep command will continue after 7 seconds since the start of
        // the ENTIRE program, so it halts for 3 seconds after task2 is carried:
        try {
            Thread.sleep(7000L);
        } catch (final Exception ex) {
            
        }
        
        scheduler.close();
    }
}

Critique request

As always, I am eager to hear about any constructive commentary.

\$\endgroup\$

2 Answers 2

7
\$\begingroup\$

Bugs

Tasks may sleep longer than they should, because you assume that each wakup after MAXIMUM_WAIT_NANOS happens immediately, but this presumes that a CPU is idle at that instant in time. If no CPU is idle at that time, execution will wait for an idle CPU, and more time may have elapsed than you are subtracting from sleepDurationNanoSeconds. You should consult a clock to determine elapsed time instead.

When TaskScheduler.close sets cancelled = true, it does so through a data race. As a consequence, DispatcherThreads may not see this update, and continue on their merry way.

If a task is scheduled while another thread is closing the scheduler, the thread for this task is added to dispatcherThreadSet while that set is being iterated, which may result in the iterator not visiting this thread, which is therefore not notified of the closure, and proceeds to execute the task in spite of the cancellation.

If task.run() throws an exception, the DispatcherThread is never removed from dispatcherThreadSet, leaking memory.

Poor Performance

You are creating a new platform thread for every task being scheduled. This allocates space for the entire thread stack, and is quite expensive for short tasks. Consider using a virtual thread instead.

Even if the DispatcherThreads notice the cancellation request, they are only checking it once per second, so there will be up to a second delay until they terminate. And waking up the threads just to check the flag is a bit inefficient.

Cumbersome API

scheduler.schedule(new Task(whatever, 1_000_000_000));

Why are you saddling the caller with calling a task constructor every time they want to submit a task?

Specifying long wait durations results in very long numbers. Suppose I want to wait an hour:

scheduler.schedule(new Task(whatever, 3_600_000_000_000));

This many zeroes make me dizzy, and it's easy to mess up by adding a group of _000 too little or too many.

If I were to specify such an API, I'd probably accept:

schedule.schedule(whatever, Duration.ofHours(1));

Misleading Naming

The DispatcherThread does not dispatch, at least not in the sense of "sending off promptly". I'd probably call it WorkerThread instead.

Style

  • Task could be a record.
  • DispatcherThread should probably favor composition over inheritance, and is arguably unnecessary, since cancelled is always the same for all DispatcherThreads.

Simplicity

You call this a "simple" TaskScheduler, but you've actually written 3 different classes, and the code is quite long for what it does.

Can we do better?

Here's how I'd do it:

public class SimpleThreadScheduler implements AutoCloseable {
    private final Object lock = new Object();
    // guarded by `lock`
    private boolean closed;
    
    public void schedule(Runnable task, Duration delay) {
        synchronized (lock) {
            if (closed) {
                throw new IllegalStateException("This scheduler has been closed");
            }           
        }
        
        var at = Instant.now().plus(delay);
        Thread.ofVirtual().start(() -> {
            synchronized (lock) {
                Duration remaining;
                while ((remaining = Duration.between(Instant.now(), at)).isPositive() && !closed) {
                    var millis = remaining.toMillis();
                    var nanos = remaining.toNanosPart() % 1_000_000;
                    try {
                        lock.wait(millis, nanos); 
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
                if (closed) {
                    return;
                }
            }
            task.run();
        });
    }
    
    @Override
    public void close() {
        synchronized (lock) {
            closed = true;
            lock.notifyAll();
        }
    }
}

I call Instant.now() twice to account for the delay until the worker thread begins execution (which may be significant if no CPU is currently available to run the new thread).

I use wait/notifyAll to notify the waiting threads, and locking to make closing atomic. The while loop is necessary due to spurious wakeups.

All access to closed happens within synchronized to establish a synchronizes-with (and therefore happens-before) among the various threads to ensure that writes are visible to all involved threads.

\$\endgroup\$
3
  • \$\begingroup\$ What should I be aware of when doing t.interrupt()? \$\endgroup\$ Commented Dec 22, 2025 at 8:20
  • \$\begingroup\$ Mostly what javadoc tells you: Only has an effect if the other thread is blocked (or checks interrupted manually), and causes that thread to throw (so that thread should better be at a place that handles exceptions gracefully, so don't interrupt unknown code whose exception safety is in doubt) \$\endgroup\$ Commented Dec 22, 2025 at 18:08
  • \$\begingroup\$ On second thought, that actually is a problem here, because there is no way to know that a "waiting" thread has actually gone to sleep. Also, that solution inherited another bug from your approach (iteration of a ConcurrentHashMaps is not atomic), so I switched to wait/notifyAll to solve these issues. \$\endgroup\$ Commented Dec 22, 2025 at 21:50
4
\$\begingroup\$

DRY

In Demo.main I have to wonder why task1, task2, and task3 are not simply elements of an array that you could loop over?

E.g.

        final TaskScheduler scheduler = new TaskScheduler();
         
        final Task[] tasks = {
            new Task(
                () -> System.out.println("hello"),
                2_000_000_000L
            ),
            new Task(
                () -> System.out.println("world"), 
                4_000_000_000L
            ),
            new Task(
                () -> System.out.println("You should not see this text."),
                100_000_000_000L
            )
        };
        
        for (final Task task : tasks) {
            scheduler.schedule(task);
        }

I'm going to ask this kind of question anytime I see a variable1, variable2, variable3 pattern.

You might also have a private method in Demo which builds these very simple tasks, such that you could write something like the following.

        final Task[] tasks = {
            printerTask("hello", 2_000_000_000L),
            printerTask("world", 4_000_000_000L),
            printerTask("You should not see this text.", 100_000_000_000L)
        };

Exception

There are several places you note that methods may throw an Exception and you catch Exception. Can you narrow this down to something more specific?

Catching (and doing nothing with) any exception may lead to behavior you're not expecting.

\$\endgroup\$
1
  • 1
    \$\begingroup\$ To me, catching an exception means "I know how to fix this, and I am the right one to do it". Thus, catching Exception means "I know how to fix every imaginable failure that could possibly happen in the whole universe, and no one else can fix any of them", which is almost never the case. (The only exception(hah!) would be something like the debugger in an IDE or the top-level run loop in a REPL which would catch any failure in order to display it to the user and gracefully exit – and even then you could ask yourself whether that's really necessary since the JVM does just that anyway.) \$\endgroup\$ Commented Dec 22, 2025 at 18:54

You must log in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.