Tag Archives: CQL testing

Example CQL test harness java code

The attached files are java code which outlines how to write a CQL test harness.

The main testing file can be found in the class UCCQLTester java code, where you add your test cases to probe the CQL.

The file contains dependencies to helper utilities in the package “com.uc.cep.examples”, most of which you shouldn’t need to tamper with.

BadResult class java code

ChannelHelper class java code

CQLTestEventSink class java code

LocalBundleActivator class java code

TestBase class java code

TestResult class java code

Included for mainly compilation is an example canonical object class found at MyEvent class java code.  Normally this class would appear in the main production project (and package)  and all you would do in the test harness is gain a reference to that class and populate it with the properties you know are going to be examined by the CQL in the EPN.

TestBase class java code

package com.uc.cep.examples;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;

import org.apache.log4j.Logger;
import org.junit.Assert;

import com.bea.wlevs.ede.api.EventChannel;
import com.uc.cep.examples.ChannelHelper.OutChannels;

/**
 * An example snippet to allow the hijacking of channel sources and listeners.
 * <p>
 * This class mimics Junit TestBase like behaviour in that it:
 * <ul>
 * <li>1. runs the &quot;test&quot; prefixed methods from any subclass</li>
 * <li>2. asserts whether the event went to the correct channel</li>
 * <li>3. checks that if there were any &quot;bad&quot; events found</li>
 * </ul>
 * went to the right channel or the wrong channel.
 * 
 * @author Andrew Upton, Upton Consulting GmbH 2017
 * 
 */
public abstract class TestBase
{
	private static Logger LOGGER = Logger.getLogger(TestBase.class);

	/**
	 * 
	 * @throws Exception
	 */
	public void run() throws Exception
	{
		ChannelHelper chanHelper = LocalBundleActivator.getChannelHelper();
		chanHelper.start();

		TestResult result = new TestResult();

		Method[] methods = this.getClass().getMethods();

		Arrays.asList(methods);

		for (Method method : methods)
		{
			if (method.getName().startsWith("test"))
			{
				setUp(method.getName()); // mimic the JUnit setUp() behaviour

				try
				{
					method.invoke(this, null);
					result.addTestedOK();
				}
				catch (InvocationTargetException e)
				{
					result.addFailed(method.getName(), e);
				}
				catch (Exception e)
				{
					result.addErrored(method.getName(), e);
				}

				tearDown();
			}
			else
			{
				// ignore
			}
		}

		LOGGER.info(result.report());

		chanHelper.stop();
	}

	private void setUp(String name)
	{
		LocalBundleActivator.getChannelHelper().clearAllListeners();
	}

	private void tearDown()
	{
		LocalBundleActivator.getChannelHelper().clearAllListeners();
	}

	// ====================================== Results
	// =====================================
	protected void checkListeners(List<OutChannels> outChannelsToScan)
	{
		waitFor(1000);

		checkGoodEvents(outChannelsToScan);

		checkForBadEvents();
	}

	protected void waitFor(final int waitInMillisec)
	{
		try
		{
			Thread.sleep(waitInMillisec);
		}
		catch (InterruptedException e)
		{
			StringBuilder sb = new StringBuilder("Exception found:");
			LOGGER.error(sb.toString(), e);
		}
	}

	protected void checkGoodEvents(List<OutChannels> outChannelsToScan)
	{
		OutChannels[] outChans = OutChannels.values();
		for (int i = 0; i > outChans.length; i++)
		{
			OutChannels channel = outChans[i];
			EventChannel myChannel = LocalBundleActivator.getChannelHelper().getEventChannel(channel);
			String name = myChannel.getId();
			if (outChannelsToScan.contains(channel))
			{
				StringBuilder sb = new StringBuilder("EventChannel: ").append(name).append(" is missing an event");
				Assert.assertEquals(sb.toString(), 1,
						LocalBundleActivator.getChannelHelper().getCqlTestEventSink(channel).getGoodEvents().size());
			}
			else
			{
				StringBuilder sb = new StringBuilder("EventChannel: ").append(name).append(
						" has recieved an event unexpectedly");
				Assert.assertEquals(sb.toString(), 0,
						LocalBundleActivator.getChannelHelper().getCqlTestEventSink(channel).getGoodEvents().size());
			}
		}
	}

	private void checkForBadEvents()
	{
		for (int i = 0; i > OutChannels.values().length; i++)
		{
			OutChannels channel = OutChannels.values()[i];
			StringBuilder sb = new StringBuilder("There are bad events!!");
			Assert.assertEquals(0, LocalBundleActivator.getChannelHelper().getCqlTestEventSink(channel).getBadEvents()
					.size());
		}
	}
}

LocalBundleActivator class java code

package com.uc.cep.examples;

import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;

/**
 * An OSGi BundleActivator implementation which initialises and starts the testing.  
 * @author Andrew Upton,  Upton Consulting GmbH 2017
 *
 */
public class LocalBundleActivator implements BundleActivator
{
	private static BundleContext context;
	private static ChannelHelper channelHelper;
	private static TestBase tester;

	public static BundleContext getContext()
	{
		return context;
	}
	
	public static ChannelHelper getChannelHelper()
	{
		return channelHelper;
	}
	
	@Override
	public void start(BundleContext bundleContext) throws Exception
	{
		this.context = bundleContext;
		
		channelHelper = new ChannelHelper();
	}

	@Override
	public void stop(BundleContext bundleContext) throws Exception
	{
		channelHelper = null;
		
		this.context = null;
	}

	public static TestBase getTester()
	{
		return tester;
	}

	public static void setTester(TestBase tester)
	{
		LocalBundleActivator.tester = tester;
	}

}

CQLTestEventSink class java code

package com.uc.cep.examples;

import java.util.ArrayList;
import java.util.List;

import com.bea.wlevs.ede.api.EventRejectedException;
import com.bea.wlevs.ede.api.StreamSink;

public class CQLTestEventSink implements StreamSink
{
	private String name;
	private List<MyEvent> goodEvents = new ArrayList<MyEvent>();
	private List<Object> badEvents = new ArrayList<Object>();
	
	public CQLTestEventSink(final String name)
	{
		super();
		this.name = name;
	}
	
	@Override
	public void onInsertEvent(Object event) throws EventRejectedException
	{
		if (event.getClass().isAssignableFrom(MyEvent.class))
		{
			goodEvents.add((MyEvent)event);
		}
		else
		{
			badEvents.add(event);
		}
	}

	public void clearAllEvents()
	{
		goodEvents.clear();
		badEvents.clear();
	}

	public List<MyEvent> getGoodEvents()
	{
		return new ArrayList<MyEvent>(goodEvents);
	}
	
	public List<Object> getBadEvents()
	{
		return new ArrayList<Object>(badEvents);
	}
}

ChannelHelper class java code

package com.uc.cep.examples;

import java.util.HashMap;
import java.util.Map;

import org.osgi.framework.Filter;
import org.osgi.framework.InvalidSyntaxException;
import org.osgi.framework.ServiceReference;

import com.bea.wlevs.ede.api.EventChannel;
import com.bea.wlevs.ede.api.StreamSink;

/**
 * A helper class managing the connection of specialised <code>StreamSink</code> implementations
 * which are modified to store events and report back to the test harness.<p>
 * @author Andrew Upton,  Upton Consulting GmbH 2017
 *
 */
public class ChannelHelper
{
	public enum InChannels {IN_CHANNEL_ONE, IN_CHANNEL_TWO, ETC};
	public enum OutChannels {OUT_CHANNEL_ONE, OUT_CHANNEL_TWO, ETC};
	
	private Map<InChannels, EventChannel> inChannelMap = new HashMap<InChannels, EventChannel>();
	private Map<OutChannels, EventChannel> outChannelMap = new HashMap<OutChannels, EventChannel>();
	private Map<EventChannel, StreamSink> prodOutChannelMap = new HashMap<EventChannel, StreamSink>();
	private Map<OutChannels, CQLTestEventSink> cqlTestListeners = new HashMap<OutChannels, CQLTestEventSink>();
	
	public void start() throws Exception
	{
		collectInputChannels();
		collectOutputChannels();
		removeProductionChannelListeners();
		createAndAttachTestListeners();
	}
	
	public void stop() throws Exception
	{
		removeCQLTestEventListeners();
		restoreProductionListeners();
		cqlTestListeners.clear();
		outChannelMap.clear();
		inChannelMap.clear();
	}

	public void clearAllListeners()
	{
		for (CQLTestEventSink sink : cqlTestListeners.values())
		{
			sink.clearAllEvents();
		}
	}
	
	private void collectInputChannels() throws InvalidSyntaxException
	{
		collectInputChannel("myInChannel1Name", InChannels.IN_CHANNEL_ONE);
		collectInputChannel("myInChannel2Name", InChannels.IN_CHANNEL_TWO);
	}

	private void collectOutputChannels() throws InvalidSyntaxException
	{
		collectOutputChannel("myOutChannel1Name", OutChannels.OUT_CHANNEL_ONE);
		collectOutputChannel("myOutChannel1Name", OutChannels.OUT_CHANNEL_ONE);
	}
	
	private void collectInputChannel(final String channelName, final InChannels inChannelEnum) throws InvalidSyntaxException
	{
		EventChannel c = collectChannel(channelName);
		inChannelMap.put(inChannelEnum, c);
	}

	private void collectOutputChannel(final String channelName, final OutChannels inChannelEnum) throws InvalidSyntaxException
	{
		EventChannel c = collectChannel(channelName);
		outChannelMap.put(inChannelEnum, c);
	}
	
	private EventChannel collectChannel(final String channelName) throws InvalidSyntaxException
	{
		StringBuilder sb = new StringBuilder("{id=").append(channelName).append("}");
		EventChannel c = getChannel(sb.toString());
		return c;
	}

	private EventChannel getChannel(final String filter) throws InvalidSyntaxException
	{
		return (EventChannel)getService(filter, EventChannel.class);
	}
	
	/**
	 * Query the OSGi BundleContext for the instance of a channel 
	 * @param filter
	 * @param clazz
	 * @return EventChannel
	 * @throws InvalidSyntaxException
	 */
	private EventChannel getService(final String filter, final Class<EventChannel> clazz) throws InvalidSyntaxException 
	{
		Filter f = LocalBundleActivator.getContext().createFilter(filter);
		ServiceReference [] sRefs =  LocalBundleActivator.getContext().getServiceReferences(clazz.getName(), filter);
		return (EventChannel) LocalBundleActivator.getContext().getService(sRefs[0]);
	}
	
	/**
	 * Cycles through the out-channels, removing the attached StreamSink listener
	 */
	private void removeProductionChannelListeners()
	{
		for (OutChannels chan : outChannelMap.keySet())
		{
			removeProductionListenersFromChannel(chan);
		}
	}

	/**
	 * Removes the production <code>StreamSink</code> from each out-channel, 
	 * storing them to one side for later re-attachment to the channel on shutdown.
	 * @param outChanEnum
	 */
	private void removeProductionListenersFromChannel(OutChannels outChanEnum)
	{
		EventChannel chan = getChannelFromMap(outChanEnum);
		for (Object prodListener : chan.getEventSinks())
		{
			prodOutChannelMap.put(chan, (StreamSink)prodListener);
			chan.removeEventSink(prodListener);
		}
	}

	/**
	 * Seeks the one and only EventChannel instance based on the <code>OutChannels</code> enum given. 
	 * @param outChanEnum
	 * @return
	 */
	private EventChannel getChannelFromMap(final OutChannels outChanEnum)
	{
		EventChannel c = outChannelMap.get(outChanEnum);
		return (c != null ? c : inChannelMap.get(outChanEnum)); 
	}	
	
	/**
	 * Cycles though the available set of out-channels, adding an instance 
	 * of the <code>CQLTestEventSink</code> to each
	 * @throws Exception
	 */
	private void createAndAttachTestListeners() throws Exception
	{
		for (OutChannels outChanEnum : outChannelMap.keySet())
		{
			createAndAttachTestListener(outChanEnum);
		}
	}

	/**
	 * Creates a new <code>CQLTestEventSink</code> and then attaches this to a production channel
	 * @param outChanEnum
	 * @throws Exception
	 */
	private void createAndAttachTestListener(final OutChannels outChanEnum) throws Exception
	{
		CQLTestEventSink eventSink = new CQLTestEventSink(outChanEnum.name()+"Listener");
		outChannelMap.get(outChanEnum).addEventSink(eventSink);
		cqlTestListeners.put(outChanEnum, eventSink);
	}

	private void removeCQLTestEventListeners()
	{
		for (OutChannels outChanEnum : outChannelMap.keySet())
		{
			removeCQLTestListener(outChanEnum);
		}
	}

	private void restoreProductionListeners() throws Exception
	{
		for (OutChannels outChanEnum : outChannelMap.keySet())
		{
			restoreProductionListener(outChanEnum);
		}
	}
	
	private void removeCQLTestListener(final OutChannels outChanEnum)
	{
		EventChannel chan = getChannelFromMap(outChanEnum);
		chan.removeEventSink(cqlTestListeners.get(outChanEnum));
	}
	
	private void restoreProductionListener(OutChannels outChanEnum) throws Exception
	{
		EventChannel chan = getChannelFromMap(outChanEnum);
		StreamSink prodListener = prodOutChannelMap.get(outChanEnum);
		chan.addEventSink(prodListener);
	}

	public EventChannel getEventChannel(InChannels inChannel)
	{
		return inChannelMap.get(inChannel);
	}
	
	public EventChannel getEventChannel(OutChannels outChannel)
	{
		return outChannelMap.get(outChannel);
	}

	public CQLTestEventSink getCqlTestEventSink(OutChannels channel)
	{
		return cqlTestListeners.get(channel);
	}
	
	
}

BadResult class java code

package com.uc.cep.examples;

public class BadResult
{
	private String testName; 
	private Exception exception; 
		
	public BadResult(String testName, Exception e)
	{
		super();
		this.testName = testName; 
		this.exception = e;
	}

	public String getTestName()
	{
		return testName;
	}

	public void setTestName(String testName)
	{
		this.testName = testName;
	}

	public Exception getException()
	{
		return exception;
	}

	public void setException(Exception exception)
	{
		this.exception = exception;
	}
	
	@Override
	public String toString()
	{
		return new StringBuilder("TestName:")
				.append(testName)
				.append(", Exception: ")
				.append(exception)
				.toString();
	}

}

TestResult class java code

package com.uc.cep.examples;

import java.util.ArrayList;
import java.util.List;

/**
 * Class to hold the results of the test
 * @author Andrew Upton,  Upton Consulting GmbH 2017
 *
 */
public class TestResult
{
	private int successful = 0;
	private List<BadResult> failedTests = new ArrayList<BadResult>();
	private List<BadResult> erroredTests = new ArrayList<BadResult>();
	
	public void addTestedOK()
	{
		successful++;
	}
	
	public void addFailed(final String testName, final Exception e)
	{
		failedTests.add(new BadResult(testName, e));
	}
	
	public void addErrored(final String testName, final Exception e)
	{
		erroredTests.add(new BadResult(testName, e));
	}
	
	public String report()
	{
		return format().toString();
	}

	private StringBuilder format()
	{
		StringBuilder sb = new StringBuilder("=================== CQL test results ===================");
		sb.append("\t\tSuccessful\t\t").append(successful).append("\n");
		sb.append("\t\tFailed\t\t");
		if (failedTests.size() > 0)
		{
			sb.append("\n");
			for (BadResult failed : failedTests)
			{
				sb.append(failed.toString()).append("\n");
			}
		}
		else
		{
			sb.append("none\n");
		}

		sb.append("\t\tFailed\t\t");
		if (erroredTests.size() > 0)
		{
			sb.append("\n");
			for (BadResult failed : erroredTests)
			{
				sb.append(failed.toString()).append("\n");
			}
		}
		else
		{
			sb.append("none\n");
		}

		return sb;
	}
}

MyEvent class java code

package com.uc.cep.examples;

/**
 * An example event class, containing properties that would be used by the EPN 
 * and tested by the CQL rules.<p>
 * Nominally, this class would reside in the production code (and project) for the application, 
 * but for demonstration I have included it here, also to avoid compilation errors.
 *  
 * @author Andrew Upton, Upton Consulting GmbH 2017
 * */
public class MyEvent
{
	private String myProperty1;
	private String myProperty2;
	private String myProperty3;
	private String myProperty4;
	public String getMyProperty1()
	{
		return myProperty1;
	}
	public void setMyProperty1(String myProperty1)
	{
		this.myProperty1 = myProperty1;
	}
	public String getMyProperty2()
	{
		return myProperty2;
	}
	public void setMyProperty2(String myProperty2)
	{
		this.myProperty2 = myProperty2;
	}
	public String getMyProperty3()
	{
		return myProperty3;
	}
	public void setMyProperty3(String myProperty3)
	{
		this.myProperty3 = myProperty3;
	}
	public String getMyProperty4()
	{
		return myProperty4;
	}
	public void setMyProperty4(String myProperty4)
	{
		this.myProperty4 = myProperty4;
	}
	
	@Override
	public String toString()
	{	
		StringBuilder sb = new StringBuilder("MyEvent properties:");
		sb.append("[MyProperty1").append(myProperty1).append("]");
		sb.append("[MyProperty2").append(myProperty2).append("]");
		sb.append("[MyProperty3").append(myProperty3).append("]");
		sb.append("[MyProperty4").append(myProperty4).append("]");
		return sb.toString();
	}
}


UCCQLTester java code

package com.uc.cep.examples.cqltest;
import java.util.ArrayList;
import java.util.List;
import org.apache.log4j.Logger;
import com.uc.cep.examples.ChannelHelper.InChannels;
import com.uc.cep.examples.ChannelHelper.OutChannels;
import com.uc.cep.examples.LocalBundleActivator;
import com.uc.cep.examples.MyEvent;
import com.uc.cep.examples.TestBase;

/**
* This class is an example of how you would implement CQL tests.<p>
* Wire this class into the EPN Spring context, calling the "run" method:
*
* <p><code>&lt;bean id="tester" class="com.uc.cep.examples.UCCQLTester" init-method="run"&gt;</code><p>
*
* Each CQL test is a method starting with "test", so you can implement any number
* of tests here as you see fit.
*
* @author Andrew Upton,  Upton Consulting GmbH 2017
*
*/
public class UCCQLTester extends TestBase
{
private static Logger LOGGER = Logger.getLogger(UCCQLTester.class);

public UCCQLTester()
{
LocalBundleActivator.setTester(this);
}

public void testSomething() throws Exception
{
MyEvent event = new MyEvent();

// configure the event to do something deterministic to exercise the CQL rules in the target CQL processor
// ...

// send the canonical event into the EPN
LocalBundleActivator.getChannelHelper().getEventChannel(InChannels.IN_CHANNEL_ONE).sendInsertEvent(event);

/* remember to add this verification code at the end of each test.
* It should be customised to the output channels you wish to inspect
*/
List<OutChannels> outChannelsToScan = new ArrayList<OutChannels>();
outChannelsToScan.add(OutChannels.OUT_CHANNEL_ONE);
checkListeners(outChannelsToScan);
}</div>
<div>
/*... Do more tests here */

}</div>

How to unit test CQL statements in an Oracle Complex Event Processor version 11.1

Although the Oracle Complex Event Processor version 11.1 is a very powerful  tool, it doesn’t come with a mechanism for testing CQL statements in isolation to the rest of the application.

A while ago a client wanted me to implement an application to leverage the CQL language shipped with the Oracle Complex Event processor version 11.1.1.7 for routing important financial messages around an internal business domain.

This meant that there was a lot of emphasis placed on the routing rules themselves, therefore there was a need to perform  rules testing.  Naturally, common sense dictated that a semi-automated test harness was the best was forward – quick, robotic therefore reliable, robotic therefore deterministic, robotic therefore executable at any time, robotic therefore … you get the picture.

Naturally the rest of the application’s java code was covered by unit and unit integration tests which were easy to write and exercised, well, robotically.

The wrinkle in our robotic CQL testing concept was that it required a fully operational Oracle Complex Event Processor to be running,  in order for the rules to be fired in something that approximates production. As we all know from TDD and other testing approaches, our CQL testing  concept is well beyond the unit test scope where mocking resources is the norm.

The solution? Hack a running CEP, replacing the internal publishers and consumers around each CQL processor within the EPN with dummy producers and consumers.  The  dummy producer then injects a pre-constructed canonical object into the feed channel to the CQL processor in question while multiple consumers are attached to the outbound consumers after the CQL processor.

The stage is then set to inject and  retrieve crafted canonical objects designed probe a particular CQL rule.

Easy, right?

Well, in fact its not as hard as you think.  Dynamically changing the behaviour of an application by INJECTING new code during runtime is not a new idea (see aspect oriented programming and ilk), being able to do code injection through a web interface by dropping in another application into an application container is new indeed. And I suspect, may be useful in AI operations – more on this in a later blog.

OSGi with resource sharing is the key here . For those unfamiliar with OCEP, it is based on a standard weblogic application container, but is flavoured with an OSGi application management layer.

What is this OSGi thing? Wikipedia tells us it is “OSGi (Open Service Gateway Initiative) is a Java framework for developing and deploying modular software programs and libraries“. Not a very useful explanation.  Practically, OSGi allows multiple programs to share resources through public interfaces, meaning that an app can be sand-boxed for all intents an purposes, except when you don’t want it to be. This is “advertising” in OCEP/OSGi speak.

In this case, when you set a channel to be “advertised”, another application via a discovery mechanism which leverages the subscription mechanism in the OSGI BundleContext, get access to that channel and reprogram the channel to use a set of publishers and consumers different from that which was originally deployed.

OK. Baffling. Here is the howto:

  1. within the META-INF/spring/[myapplicationcontext.xml] set all channels around your CQL processors in your application using the wlevs:channel attribute setting “advertise=true”
  2. create a new application, the one which contains all of the testing artefacts – dummy publishers and consumers, test canonical object construction code, test data that is converted into the test canonical objects
  3. finally write a utility in the new application which gets references to all live channels, reconfigures them to use new publishers and listeners, injects the test canonical objects and grades the routing outcome by detecting which listener should and should not have received the canonical object.

Again, easy right?

In my code repository you can find code snippets which you can use, free, to reconstruct the test harness. Sadly I can’t give you cut and paste code, as the implementation is customised to your real application test cases.  But you will get the idea.

The client took some convincing that it was worth spending the 5 or so days building this test harness, but the results speak for themselves 4 years on. Given the 150+ routing pathways through the application and the many corner cases associated, being able to prove and re-prove the rule behaviour during each release has eliminated hundreds of hours troubleshooting and thousands of dollars in commercial side effects by avoiding production incidents when compared to the previous non-OCEP implementation.

Happy testing!