ChannelHelper class java code

package com.uc.cep.examples;

import java.util.HashMap;
import java.util.Map;

import org.osgi.framework.Filter;
import org.osgi.framework.InvalidSyntaxException;
import org.osgi.framework.ServiceReference;

import com.bea.wlevs.ede.api.EventChannel;
import com.bea.wlevs.ede.api.StreamSink;

/**
 * A helper class managing the connection of specialised <code>StreamSink</code> implementations
 * which are modified to store events and report back to the test harness.<p>
 * @author Andrew Upton,  Upton Consulting GmbH 2017
 *
 */
public class ChannelHelper
{
	public enum InChannels {IN_CHANNEL_ONE, IN_CHANNEL_TWO, ETC};
	public enum OutChannels {OUT_CHANNEL_ONE, OUT_CHANNEL_TWO, ETC};
	
	private Map<InChannels, EventChannel> inChannelMap = new HashMap<InChannels, EventChannel>();
	private Map<OutChannels, EventChannel> outChannelMap = new HashMap<OutChannels, EventChannel>();
	private Map<EventChannel, StreamSink> prodOutChannelMap = new HashMap<EventChannel, StreamSink>();
	private Map<OutChannels, CQLTestEventSink> cqlTestListeners = new HashMap<OutChannels, CQLTestEventSink>();
	
	public void start() throws Exception
	{
		collectInputChannels();
		collectOutputChannels();
		removeProductionChannelListeners();
		createAndAttachTestListeners();
	}
	
	public void stop() throws Exception
	{
		removeCQLTestEventListeners();
		restoreProductionListeners();
		cqlTestListeners.clear();
		outChannelMap.clear();
		inChannelMap.clear();
	}

	public void clearAllListeners()
	{
		for (CQLTestEventSink sink : cqlTestListeners.values())
		{
			sink.clearAllEvents();
		}
	}
	
	private void collectInputChannels() throws InvalidSyntaxException
	{
		collectInputChannel("myInChannel1Name", InChannels.IN_CHANNEL_ONE);
		collectInputChannel("myInChannel2Name", InChannels.IN_CHANNEL_TWO);
	}

	private void collectOutputChannels() throws InvalidSyntaxException
	{
		collectOutputChannel("myOutChannel1Name", OutChannels.OUT_CHANNEL_ONE);
		collectOutputChannel("myOutChannel1Name", OutChannels.OUT_CHANNEL_ONE);
	}
	
	private void collectInputChannel(final String channelName, final InChannels inChannelEnum) throws InvalidSyntaxException
	{
		EventChannel c = collectChannel(channelName);
		inChannelMap.put(inChannelEnum, c);
	}

	private void collectOutputChannel(final String channelName, final OutChannels inChannelEnum) throws InvalidSyntaxException
	{
		EventChannel c = collectChannel(channelName);
		outChannelMap.put(inChannelEnum, c);
	}
	
	private EventChannel collectChannel(final String channelName) throws InvalidSyntaxException
	{
		StringBuilder sb = new StringBuilder("{id=").append(channelName).append("}");
		EventChannel c = getChannel(sb.toString());
		return c;
	}

	private EventChannel getChannel(final String filter) throws InvalidSyntaxException
	{
		return (EventChannel)getService(filter, EventChannel.class);
	}
	
	/**
	 * Query the OSGi BundleContext for the instance of a channel 
	 * @param filter
	 * @param clazz
	 * @return EventChannel
	 * @throws InvalidSyntaxException
	 */
	private EventChannel getService(final String filter, final Class<EventChannel> clazz) throws InvalidSyntaxException 
	{
		Filter f = LocalBundleActivator.getContext().createFilter(filter);
		ServiceReference [] sRefs =  LocalBundleActivator.getContext().getServiceReferences(clazz.getName(), filter);
		return (EventChannel) LocalBundleActivator.getContext().getService(sRefs[0]);
	}
	
	/**
	 * Cycles through the out-channels, removing the attached StreamSink listener
	 */
	private void removeProductionChannelListeners()
	{
		for (OutChannels chan : outChannelMap.keySet())
		{
			removeProductionListenersFromChannel(chan);
		}
	}

	/**
	 * Removes the production <code>StreamSink</code> from each out-channel, 
	 * storing them to one side for later re-attachment to the channel on shutdown.
	 * @param outChanEnum
	 */
	private void removeProductionListenersFromChannel(OutChannels outChanEnum)
	{
		EventChannel chan = getChannelFromMap(outChanEnum);
		for (Object prodListener : chan.getEventSinks())
		{
			prodOutChannelMap.put(chan, (StreamSink)prodListener);
			chan.removeEventSink(prodListener);
		}
	}

	/**
	 * Seeks the one and only EventChannel instance based on the <code>OutChannels</code> enum given. 
	 * @param outChanEnum
	 * @return
	 */
	private EventChannel getChannelFromMap(final OutChannels outChanEnum)
	{
		EventChannel c = outChannelMap.get(outChanEnum);
		return (c != null ? c : inChannelMap.get(outChanEnum)); 
	}	
	
	/**
	 * Cycles though the available set of out-channels, adding an instance 
	 * of the <code>CQLTestEventSink</code> to each
	 * @throws Exception
	 */
	private void createAndAttachTestListeners() throws Exception
	{
		for (OutChannels outChanEnum : outChannelMap.keySet())
		{
			createAndAttachTestListener(outChanEnum);
		}
	}

	/**
	 * Creates a new <code>CQLTestEventSink</code> and then attaches this to a production channel
	 * @param outChanEnum
	 * @throws Exception
	 */
	private void createAndAttachTestListener(final OutChannels outChanEnum) throws Exception
	{
		CQLTestEventSink eventSink = new CQLTestEventSink(outChanEnum.name()+"Listener");
		outChannelMap.get(outChanEnum).addEventSink(eventSink);
		cqlTestListeners.put(outChanEnum, eventSink);
	}

	private void removeCQLTestEventListeners()
	{
		for (OutChannels outChanEnum : outChannelMap.keySet())
		{
			removeCQLTestListener(outChanEnum);
		}
	}

	private void restoreProductionListeners() throws Exception
	{
		for (OutChannels outChanEnum : outChannelMap.keySet())
		{
			restoreProductionListener(outChanEnum);
		}
	}
	
	private void removeCQLTestListener(final OutChannels outChanEnum)
	{
		EventChannel chan = getChannelFromMap(outChanEnum);
		chan.removeEventSink(cqlTestListeners.get(outChanEnum));
	}
	
	private void restoreProductionListener(OutChannels outChanEnum) throws Exception
	{
		EventChannel chan = getChannelFromMap(outChanEnum);
		StreamSink prodListener = prodOutChannelMap.get(outChanEnum);
		chan.addEventSink(prodListener);
	}

	public EventChannel getEventChannel(InChannels inChannel)
	{
		return inChannelMap.get(inChannel);
	}
	
	public EventChannel getEventChannel(OutChannels outChannel)
	{
		return outChannelMap.get(outChannel);
	}

	public CQLTestEventSink getCqlTestEventSink(OutChannels channel)
	{
		return cqlTestListeners.get(channel);
	}
	
	
}