EmailLinkedInGoogle+TwitterFacebook

Source code discussed in this post can be freely downloaded and used in whatever way or form you want.  SVN URL is, svn checkout http://whiteboardjunkie.googlecode.com/svn/trunk/parallelly parallelly-read-only

I put together couple of classes to ease testing pieces of code under concurrency load.  It is generic enough and accommodates a certain level of flexibility.  It is reusable for your specific purpose and completely eliminates the need to write plumbing code to manage concurrent execution. I found it way more simpler than some of the nonstandard ‘frameworks’ solving the same problem.   What it does not do currently is to watch for thread locks.  A bit more hacking will add that feature.  Another day.

	@Test
	public void testServerUnderLoad(){
		final JumbleServer jumbleServer = new JumbleServer();
		CompletionServiceForConcurrencyTest<String> resultService = new CompletionServiceForConcurrencyTest<String>(Executors.newCachedThreadPool());
		for (int i = 0 ; i < MAX_THREAD_COUNT; i++){
			resultService.submit(
				new ConcurrentTestCallable<String>(new Object[]{
														UUID.randomUUID().toString()
													}) {
					@Override
					String businessLogic() throws Exception{
						return jumbleServer.jumbleString((String)(parameters[0]));
					}
					@Override
					boolean isResultValid(String output) throws Exception {
						String input = (String)(parameters[0]);
						char[] inputChars = input.toCharArray();
						char[] outputChars = output.toCharArray();
						Arrays.sort(inputChars);
						Arrays.sort(outputChars);
						//Once sorted both char[] should be the same :) 
						if (new String(inputChars).equals(new String(outputChars)))
							return true;
						return false;
					}
			});
		}
		Assert.assertTrue(resultService.test());
	}

That is all.  This tests, instantiates the class to test and calls the method it wants to test in  MAX_THREAD_COUNT number of threads.  Execution is managed through a cached thread pool.  It collects back results from all threads and validates each of the threads worked as expected.  You need to implement two methods specific to your test case viz.

abstract T businessLogic() throws Exception;
abstract boolean isResultValid(T output) throws Exception;

Finally it prints out a summary of test execution.  The results look something like this.  For ease of verification I have added random failure as a feature to simulate Exception conditions for some threads.

-------------------------------------------------------
 T E S T S
-------------------------------------------------------
Running org.boni.parallely.service.ConcurrencyTestJig
Jan 12, 2012 5:53:46 PM org.boni.parallely.service.CompletionServiceForConcurrencyTest test
SEVERE: Exception while executing thread
java.lang.Exception: This is not a good time to Jumble
        at org.boni.parallely.service.ConcurrencyTestJig$JumbleServer.jumbleString(ConcurrencyTestJig.java:28)
        at org.boni.parallely.service.ConcurrencyTestJig$1.businessLogic(ConcurrencyTestJig.java:63)
        at org.boni.parallely.service.ConcurrencyTestJig$1.businessLogic(ConcurrencyTestJig.java:60)
        at org.boni.parallely.service.ConcurrentTestCallable.call(ConcurrentTestCallable.java:14)
        at org.boni.parallely.service.ConcurrentTestCallable.call(ConcurrentTestCallable.java:5)
        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
Concurrency test summary : TOTAL_THREADS:26, SUCCESS:24, FAILURE:2
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:619)
Jan 12, 2012 5:53:46 PM org.boni.parallely.service.CompletionServiceForConcurrencyTest test
SEVERE: Exception while executing thread
java.lang.Exception: This is not a good time to Jumble
        at org.boni.parallely.service.ConcurrencyTestJig$JumbleServer.jumbleString(ConcurrencyTestJig.java:28)
        at org.boni.parallely.service.ConcurrencyTestJig$1.businessLogic(ConcurrencyTestJig.java:63)
        at org.boni.parallely.service.ConcurrencyTestJig$1.businessLogic(ConcurrencyTestJig.java:60)
        at org.boni.parallely.service.ConcurrentTestCallable.call(ConcurrentTestCallable.java:14)
        at org.boni.parallely.service.ConcurrentTestCallable.call(ConcurrentTestCallable.java:5)
        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:619)
Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.207 sec <<< FAILURE!

Results :

Failed tests:
  testServerUnderLoad(org.boni.parallely.service.ConcurrencyTestJig)

Tests run: 1, Failures: 1, Errors: 0, Skipped: 0

[INFO] ------------------------------------------------------------------------
[ERROR] BUILD FAILURE
[INFO] ------------------------------------------------------------------------

This test uses a simple Jig (it will be laughable if I call it a framework) to make such tests look trivial by eliminating thread management code completely.  Let us take a look.

Main outer container of execution of the test jig is a custom ExecutorCompletionService  that neatly wraps the executor of your choice.  It has a mildly overidden submit(Callable<T>) and a completely deprecated submit(Runnable).  Feel free to modify the code to implement submit(Runnable) if that fits your specific test scenario better.  After submitting all your Callables (it is a special type of Callable and we will get to it in a bit), you call test().  This will use its ‘ExecutorCompletionService ’ capabilities to collect all the results from threads and generate a test result summary.  How cool is that!

CompletionServiceForConcurrencyTest.java

class CompletionServiceForConcurrencyTest&lt;T&gt; extends ExecutorCompletionService&lt;Result&lt;T&gt;&gt;{
	private final Log logger = LogFactory.getLog(CompletionServiceForConcurrencyTest.class);
	private int count = 0;
	public CompletionServiceForConcurrencyTest(Executor executor){
		super(executor);
	}
	@Override
	public Future&lt;Result&lt;T&gt;&gt; submit(Callable&lt;Result&lt;T&gt;&gt; task) {
		count = count+1;
		return super.submit(task);
	}
	@Override
	public Future&lt;Result&lt;T&gt;&gt; submit(Runnable task, Result&lt;T&gt; result) {
		throw new RuntimeException(&quot;Testing through runnable submission not supported currently&quot;);
	}
	/**
	 * Packs all the ran test results neatly by providing summary information
	 * and exception logs if any.
	 * @return boolean : TRUE -&gt; All threads ran as expected.
	 */
	public boolean test(){
		List&lt;Result&lt;T&gt;&gt; results = new ArrayList&lt;Result&lt;T&gt;&gt;();
		for (int i = 0 ; i &lt; count; i++){
			try {
				//waits on the CompletionService to make available results
				//as and when a thread completes.  The results are added
				//to a List for verification in the next step.
				results.add(take().get());
			} catch (InterruptedException e) {
			} catch (ExecutionException e) {
			}
		}
		int failedRsults = 0;
		for (Result&lt;T&gt; aResult : results){
			if (aResult.result == null){
				logger.error(aResult, aResult.e);
				failedRsults = failedRsults + 1;
			}
		}
		System.out.println(&quot;Concurrency test summary : TOTAL_THREADS:&quot; + results.size() + &quot;, SUCCESS:&quot; + (results.size() - failedRsults) + &quot;, FAILURE:&quot; + failedRsults );
		return (failedRsults == 0) ? true : false;
	}
}

Next let me introduce you to my Callable.  It is based on the abstract class ConcurrentTestCallable.  It is Generic to accommodate different return types from Callable execution.  It is overiding the method ‘call’  and provides a Result Value.  The Result data object wraps result, exception or a message back from the callable.  Unfortunately in my jig I am making ‘call’ final.  Feel free to change it.

ConcurrentTestCallable.java

abstract class ConcurrentTestCallable&lt;T&gt; implements Callable&lt;Result&lt;T&gt;&gt;{
	Object[] parameters = null;
	public ConcurrentTestCallable(Object[] testInputParameters){
		this.parameters = testInputParameters;
	}
	/**
	 * Implement your execution logic here.  It is supposed to either return
	 * a result of type T or throw an appropriate exception in case of error
	 * scenario.
	 * @return T result of exection
	 * @throws Exception appropriate exception wrapping error condition.
	 */
	abstract T businessLogic() throws Exception;
	/**
	 * Logic to check whether the returned result is as expected.  You have access to the
	 * Input parameters through Object[] parameters.
	 * @param output Result from {@link businessLogic}
	 * @return boolean confirming accuracy of the result or not.
	 * @throws Exception
	 */
	abstract boolean isResultValid(T output) throws Exception;
	public final Result&lt;T&gt; call() throws Exception {
		try {
			T result = businessLogic();
			if (isResultValid(result)){
				return new Result&lt;T&gt;(result, null, &quot;PERFECT MATCH&quot;,parameters);
			}
			return new Result&lt;T&gt;(result,null,&quot;UNEXPECTED RESULTS&quot;,parameters);
		} catch (Exception e) {
			return new Result&lt;T&gt;(null,e,&quot;Exception while executing thread&quot;,parameters);
		}
	}
}

Input parameters for execution of thread is passed in through an Object[] on the constructor.  There are two abstract methods that needs implementation.  The inputParameters are stored in a ‘protected’ local variables.  So those can be accessed from either of these methods.

  1. abstract T businessLogic() throws Exception
  2. abstract boolean isResultValid(T output) throws Exception

When we submit the thread, method ‘call’ will be invoked.  call simply delegates the execution first to ‘T businessLogic()’.  It uses ‘isResultValid(T)’ to validate.  Depending on the results an appropriate ‘Result’ object is returned.  If Result.result is null your thread did not execute as planned.

Result.java

//Generic result carrier to return both
//results and any possible exceptions from Callable (Thread)
class Result&lt;T&gt;{
	T result = null;
	Exception e = null;
	String message = null;
	Object[] inputParameters = null;
	Result(T aResult, Exception e, String message, Object[] inputParameters){
		this.result = aResult;
		this.e = e;
		this.message = message;
		this.inputParameters = inputParameters;
	}
	@Override
	public String toString() {
		return message;
	}
}

In the example discussed in this post I am using a server JumbleServer.jumble() that randomly throws an exception to simulate problem scenarios.  You can access the complete source code from subversion @

svn checkout http://whiteboardjunkie.googlecode.com/svn/trunk/parallelly parallelly-read-only

Edit[1/17/2012]:

To check whether the test jig is flexible as I make it sound like I got down to testing it a bit.  What I did is to write a different piece of code with a different set of responsibilities.  This one is a ‘SweatShopFactory’ that churns out nothing less than ‘Persons’.  Wow! Now that is sinister.

	class Person implements Serializable{
		String name;
		char sex;
		int age;
	}
	class SweatShopServer{
		private final Random random = new Random(System.currentTimeMillis());
		public List&lt;Person&gt; getPersons(char sex, int ageFrom, int ageTo, int count){
			List&lt;Person&gt; persons = new ArrayList&lt;Person&gt;();
			while(persons.size() &lt; count){
				Person aPerson = new Person();
				aPerson.name = UUID.randomUUID().toString();
				aPerson.sex = random.nextBoolean() ? 'F' : 'M';
				aPerson.age = random.nextInt(ageTo);
				if (aPerson.age &gt; ageFrom &amp;&amp; aPerson.age &lt; ageTo &amp;&amp; aPerson.sex == sex){
					//This is where I am rigging the tests to create
					//some random failures;
					if (aPerson.age == 17) aPerson.age = ageTo + 1;
					persons.add(aPerson);
				}
			}
			return persons;
		}
	}
1

To test this class under load I wrote this (and only this) code.

1
	@Test
	public void testPersonsServerUnderLoad(){
		final SweatShopServer sweatshopServer = new SweatShopServer();
		CompletionServiceForConcurrencyTest&lt;List&lt;Person&gt;&gt; resultService = new CompletionServiceForConcurrencyTest&lt;List&lt;Person&gt;&gt;(Executors.newCachedThreadPool());
		final Random random = new Random(System.currentTimeMillis());
		for (int i = 0 ; i &lt; MAX_THREAD_COUNT; i++){
			resultService.submit(
				new ConcurrentTestCallable&lt;List&lt;Person&gt;&gt;(new Object[]{random.nextBoolean() ? 'M' : 'F', 0, random.nextInt(100), random.nextInt(100)}) {
					@Override
					List&lt;Person&gt; businessLogic() throws Exception{
						char sex = ((Character)(parameters[0])).charValue();
						int ageFrom = ((Integer)(parameters[1])).intValue();
						int ageTo = ((Integer)(parameters[2])).intValue();
						int count = ((Integer)(parameters[3])).intValue();
						return
						sweatshopServer.getPersons(	sex, 
													ageFrom, 
													ageTo, 
													count);
					}
					@Override
					boolean isResultValid(List&lt;Person&gt; output) throws Exception {
						char sex = ((Character)(parameters[0])).charValue();
						int ageFrom = ((Integer)(parameters[1])).intValue();
						int ageTo = ((Integer)(parameters[2])).intValue();
						int count = ((Integer)(parameters[3])).intValue();
						Assert.assertNotNull(output);
						Assert.assertTrue(&quot;Expected size:&quot; + count + &quot;, Actual size:&quot; + output.size(), count == output.size());
						for (Person aPerson : output){
							Assert.assertEquals(aPerson.sex,sex);
							Assert.assertTrue(&quot;Expected age &gt; &quot; + ageFrom , aPerson.age &gt; ageFrom);
							Assert.assertTrue(&quot;Expected age &lt; &quot; + ageTo, aPerson.age &lt; ageTo);
						}
						return true;
					}
					
			});
		}
		Assert.assertTrue(resultService.test());
	}

And it worked just the way I thought it would. Perfectly!

-------------------------------------------------------
 T E S T S
-------------------------------------------------------
Running org.boni.parallely.service.ConcurrencyTestJig
Jan 17, 2012 10:50:38 AM org.boni.parallely.service.CompletionServiceForConcurrencyTest test
SEVERE: Exception while executing thread
java.lang.IllegalArgumentException: n must be positive
        at java.util.Random.nextInt(Random.java:250)
Concurrency test summary : TOTAL_THREADS:16, SUCCESS:15, FAILURE:1
        at org.boni.parallely.service.ConcurrencyTestJig$SweatShopServer.getPersons(ConcurrencyTestJig.java:61)
        at org.boni.parallely.service.ConcurrencyTestJig$1.businessLogic(ConcurrencyTestJig.java:87)
        at org.boni.parallely.service.ConcurrencyTestJig$1.businessLogic(ConcurrencyTestJig.java:80)
        at org.boni.parallely.service.ConcurrentTestCallable.call(ConcurrentTestCallable.java:28)
        at org.boni.parallely.service.ConcurrentTestCallable.call(ConcurrentTestCallable.java:5)
        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.457 sec &lt;&lt;&lt; FAILURE!
        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:619)

Results :

Failed tests:
  testPersonsServerUnderLoad(org.boni.parallely.service.ConcurrencyTestJig)

Tests run: 1, Failures: 1, Errors: 0, Skipped: 0

Ok, now that is some minimal validation. Now it is time to use it some real code. May be to load test our Image Server :)

2 Thoughts on “Unit Testing Concurrent Execution

  1. “A drop in the ocean” Is only this dot in the center of the Indian Ocean.

  2. My partner and I absolutely love your blog and find the majority of your post’s to be exactly I’m looking for.

    Do you offer guest writers to write content to suit your needs?
    I wouldn’t mind producing a post or elaborating on a number of the subjects you write regarding here. Again, awesome weblog!

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>

Post Navigation