package com.uc.cep.examples;
import java.util.HashMap;
import java.util.Map;
import org.osgi.framework.Filter;
import org.osgi.framework.InvalidSyntaxException;
import org.osgi.framework.ServiceReference;
import com.bea.wlevs.ede.api.EventChannel;
import com.bea.wlevs.ede.api.StreamSink;
/**
* A helper class managing the connection of specialised <code>StreamSink</code> implementations
* which are modified to store events and report back to the test harness.<p>
* @author Andrew Upton, Upton Consulting GmbH 2017
*
*/
public class ChannelHelper
{
public enum InChannels {IN_CHANNEL_ONE, IN_CHANNEL_TWO, ETC};
public enum OutChannels {OUT_CHANNEL_ONE, OUT_CHANNEL_TWO, ETC};
private Map<InChannels, EventChannel> inChannelMap = new HashMap<InChannels, EventChannel>();
private Map<OutChannels, EventChannel> outChannelMap = new HashMap<OutChannels, EventChannel>();
private Map<EventChannel, StreamSink> prodOutChannelMap = new HashMap<EventChannel, StreamSink>();
private Map<OutChannels, CQLTestEventSink> cqlTestListeners = new HashMap<OutChannels, CQLTestEventSink>();
public void start() throws Exception
{
collectInputChannels();
collectOutputChannels();
removeProductionChannelListeners();
createAndAttachTestListeners();
}
public void stop() throws Exception
{
removeCQLTestEventListeners();
restoreProductionListeners();
cqlTestListeners.clear();
outChannelMap.clear();
inChannelMap.clear();
}
public void clearAllListeners()
{
for (CQLTestEventSink sink : cqlTestListeners.values())
{
sink.clearAllEvents();
}
}
private void collectInputChannels() throws InvalidSyntaxException
{
collectInputChannel("myInChannel1Name", InChannels.IN_CHANNEL_ONE);
collectInputChannel("myInChannel2Name", InChannels.IN_CHANNEL_TWO);
}
private void collectOutputChannels() throws InvalidSyntaxException
{
collectOutputChannel("myOutChannel1Name", OutChannels.OUT_CHANNEL_ONE);
collectOutputChannel("myOutChannel1Name", OutChannels.OUT_CHANNEL_ONE);
}
private void collectInputChannel(final String channelName, final InChannels inChannelEnum) throws InvalidSyntaxException
{
EventChannel c = collectChannel(channelName);
inChannelMap.put(inChannelEnum, c);
}
private void collectOutputChannel(final String channelName, final OutChannels inChannelEnum) throws InvalidSyntaxException
{
EventChannel c = collectChannel(channelName);
outChannelMap.put(inChannelEnum, c);
}
private EventChannel collectChannel(final String channelName) throws InvalidSyntaxException
{
StringBuilder sb = new StringBuilder("{id=").append(channelName).append("}");
EventChannel c = getChannel(sb.toString());
return c;
}
private EventChannel getChannel(final String filter) throws InvalidSyntaxException
{
return (EventChannel)getService(filter, EventChannel.class);
}
/**
* Query the OSGi BundleContext for the instance of a channel
* @param filter
* @param clazz
* @return EventChannel
* @throws InvalidSyntaxException
*/
private EventChannel getService(final String filter, final Class<EventChannel> clazz) throws InvalidSyntaxException
{
Filter f = LocalBundleActivator.getContext().createFilter(filter);
ServiceReference [] sRefs = LocalBundleActivator.getContext().getServiceReferences(clazz.getName(), filter);
return (EventChannel) LocalBundleActivator.getContext().getService(sRefs[0]);
}
/**
* Cycles through the out-channels, removing the attached StreamSink listener
*/
private void removeProductionChannelListeners()
{
for (OutChannels chan : outChannelMap.keySet())
{
removeProductionListenersFromChannel(chan);
}
}
/**
* Removes the production <code>StreamSink</code> from each out-channel,
* storing them to one side for later re-attachment to the channel on shutdown.
* @param outChanEnum
*/
private void removeProductionListenersFromChannel(OutChannels outChanEnum)
{
EventChannel chan = getChannelFromMap(outChanEnum);
for (Object prodListener : chan.getEventSinks())
{
prodOutChannelMap.put(chan, (StreamSink)prodListener);
chan.removeEventSink(prodListener);
}
}
/**
* Seeks the one and only EventChannel instance based on the <code>OutChannels</code> enum given.
* @param outChanEnum
* @return
*/
private EventChannel getChannelFromMap(final OutChannels outChanEnum)
{
EventChannel c = outChannelMap.get(outChanEnum);
return (c != null ? c : inChannelMap.get(outChanEnum));
}
/**
* Cycles though the available set of out-channels, adding an instance
* of the <code>CQLTestEventSink</code> to each
* @throws Exception
*/
private void createAndAttachTestListeners() throws Exception
{
for (OutChannels outChanEnum : outChannelMap.keySet())
{
createAndAttachTestListener(outChanEnum);
}
}
/**
* Creates a new <code>CQLTestEventSink</code> and then attaches this to a production channel
* @param outChanEnum
* @throws Exception
*/
private void createAndAttachTestListener(final OutChannels outChanEnum) throws Exception
{
CQLTestEventSink eventSink = new CQLTestEventSink(outChanEnum.name()+"Listener");
outChannelMap.get(outChanEnum).addEventSink(eventSink);
cqlTestListeners.put(outChanEnum, eventSink);
}
private void removeCQLTestEventListeners()
{
for (OutChannels outChanEnum : outChannelMap.keySet())
{
removeCQLTestListener(outChanEnum);
}
}
private void restoreProductionListeners() throws Exception
{
for (OutChannels outChanEnum : outChannelMap.keySet())
{
restoreProductionListener(outChanEnum);
}
}
private void removeCQLTestListener(final OutChannels outChanEnum)
{
EventChannel chan = getChannelFromMap(outChanEnum);
chan.removeEventSink(cqlTestListeners.get(outChanEnum));
}
private void restoreProductionListener(OutChannels outChanEnum) throws Exception
{
EventChannel chan = getChannelFromMap(outChanEnum);
StreamSink prodListener = prodOutChannelMap.get(outChanEnum);
chan.addEventSink(prodListener);
}
public EventChannel getEventChannel(InChannels inChannel)
{
return inChannelMap.get(inChannel);
}
public EventChannel getEventChannel(OutChannels outChannel)
{
return outChannelMap.get(outChannel);
}
public CQLTestEventSink getCqlTestEventSink(OutChannels channel)
{
return cqlTestListeners.get(channel);
}
}