Igor Kromin |   Consultant. Coder. Blogger. Tinkerer. Gamer.

I make heavy use of ThreadPoolExecutor for a number of web services that I look after on my current project. This has been working great without much customisation for years but now we decided to tighten some of the service behaviour. With the old implementation it was possible to get a read timeout up the product stack waiting for a response from a service while the service happily continued executing tasks in the background. To stop this background execution, it was decided to implement forced timeouts at the thread pool level for each service.

The actual implementation was a lot more robust and testable, this example simply demonstrates the concept with the minimal amount of code.

The end result that we were trying to achieve was to have a ThreadPoolExecutor that, without changes to the standard functions or interfaces, would automatically cancel any Futures that may still be running after a given timeout. Even though the Future interface provided a get() method with a timeout, it was not desirable to modify every place where the no-parameter get() was called, so it was important to retain the standard interface while gaining automatic timeouts.

Extending ThreadPoolExecutor was fairly trivial. First, two variables were added to keep track of the timeout value and time unit. Then, a new setter was added to set these timeouts. The rest of the changes were to overwrite all of the submit() methods to make use of a custom Future implementation that was aware of timeouts. The new class MyFuture was a wrapper for the Future instance that the ThreadPoolExecutor super methods returned.
 MyThreadPoolExecutor.java
public class MyThreadPoolExecutor extends ThreadPoolExecutor {
private long timeout = -1;
private TimeUnit timeUnit = TimeUnit.SECONDS;
/* Super constructors not shown */
public void setTimeout(long timeout, TimeUnit timeUnit) {
this.timeout = timeout;
this.timeUnit = timeUnit;
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return new MyFuture<>(super.submit(task), timeout, timeUnit);
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return new MyFuture<>(super.submit(task, result), timeout, timeUnit);
}
@Override
public Future<?> submit(Runnable task) {
return new MyFuture<>(super.submit(task), timeout, timeUnit);
}
}




The custom Future implementation, MyFuture was a little bit more involved. Since Future was an interface, it was necessary to make use of the delegation pattern.
 MyFuture.java
public class MyFuture<V> implements Future<V> {
private Future<V> delegate;
private long timeout;
private TimeUnit timeUnit;
public MyFuture(Future delegate, long timeout, TimeUnit timeUnit) {
this.delegate = delegate;
this.timeout = timeout;
this.timeUnit = timeUnit;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return delegate.cancel(mayInterruptIfRunning);
}
@Override
public boolean isCancelled() {
return delegate.isCancelled();
}
@Override
public boolean isDone() {
return delegate.isDone();
}
@Override
public V get() throws InterruptedException, ExecutionException {
try {
if (timeout > 0) {
return delegate.get(timeout, timeUnit);
}
return delegate.get();
}
catch (TimeoutException e) {
this.cancel(true);
throw new ExecutionException(
"Forced timeout after " + timeout + " " + timeUnit.name(), null);
}
}
@Override
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
try {
return delegate.get(timeout, unit);
}
catch (TimeoutException e) {
this.cancel(true);
throw new ExecutionException(
"Timeout after " + timeout + " " + unit.name(), null);
}
}
}



The MyFuture class delegated most of the implemented methods to the delegate Future object without additional logic. It also kept track of the timeout and time units values, which were the same as in the MyThreadPoolExecutor class.

The no-argument get() method had additional logic that implemented timeouts and task cancellation if timeouts were set. The other get() method had similar logic except it used timeout values that were passed into it. The trick in both was to cancel() the running task, which the default implementation did not do.

That's all there was to it. The changes were minimal and all of the services could instantly make use of this new functionality. Not bad for a few minutes work!

-i

Skip down to comments...
Hope you found this post useful...

...so please read on! I love writing articles that provide beneficial information, tips and examples to my readers. All information on my blog is provided free of charge and I encourage you to share it as you wish. There is a small favour I ask in return however - engage in comments below, provide feedback, and if you see mistakes let me know.

If you want to show additional support and help me pay for web hosting and domain name registration, donations, no matter how small, are always welcome!

Use of any information contained in this blog post/article is subject to this disclaimer.
 
comments powered by Disqus
Other posts you may like...