package com.uc.cep.examples; import java.util.HashMap; import java.util.Map; import org.osgi.framework.Filter; import org.osgi.framework.InvalidSyntaxException; import org.osgi.framework.ServiceReference; import com.bea.wlevs.ede.api.EventChannel; import com.bea.wlevs.ede.api.StreamSink; /** * A helper class managing the connection of specialised <code>StreamSink</code> implementations * which are modified to store events and report back to the test harness.<p> * @author Andrew Upton, Upton Consulting GmbH 2017 * */ public class ChannelHelper { public enum InChannels {IN_CHANNEL_ONE, IN_CHANNEL_TWO, ETC}; public enum OutChannels {OUT_CHANNEL_ONE, OUT_CHANNEL_TWO, ETC}; private Map<InChannels, EventChannel> inChannelMap = new HashMap<InChannels, EventChannel>(); private Map<OutChannels, EventChannel> outChannelMap = new HashMap<OutChannels, EventChannel>(); private Map<EventChannel, StreamSink> prodOutChannelMap = new HashMap<EventChannel, StreamSink>(); private Map<OutChannels, CQLTestEventSink> cqlTestListeners = new HashMap<OutChannels, CQLTestEventSink>(); public void start() throws Exception { collectInputChannels(); collectOutputChannels(); removeProductionChannelListeners(); createAndAttachTestListeners(); } public void stop() throws Exception { removeCQLTestEventListeners(); restoreProductionListeners(); cqlTestListeners.clear(); outChannelMap.clear(); inChannelMap.clear(); } public void clearAllListeners() { for (CQLTestEventSink sink : cqlTestListeners.values()) { sink.clearAllEvents(); } } private void collectInputChannels() throws InvalidSyntaxException { collectInputChannel("myInChannel1Name", InChannels.IN_CHANNEL_ONE); collectInputChannel("myInChannel2Name", InChannels.IN_CHANNEL_TWO); } private void collectOutputChannels() throws InvalidSyntaxException { collectOutputChannel("myOutChannel1Name", OutChannels.OUT_CHANNEL_ONE); collectOutputChannel("myOutChannel1Name", OutChannels.OUT_CHANNEL_ONE); } private void collectInputChannel(final String channelName, final InChannels inChannelEnum) throws InvalidSyntaxException { EventChannel c = collectChannel(channelName); inChannelMap.put(inChannelEnum, c); } private void collectOutputChannel(final String channelName, final OutChannels inChannelEnum) throws InvalidSyntaxException { EventChannel c = collectChannel(channelName); outChannelMap.put(inChannelEnum, c); } private EventChannel collectChannel(final String channelName) throws InvalidSyntaxException { StringBuilder sb = new StringBuilder("{id=").append(channelName).append("}"); EventChannel c = getChannel(sb.toString()); return c; } private EventChannel getChannel(final String filter) throws InvalidSyntaxException { return (EventChannel)getService(filter, EventChannel.class); } /** * Query the OSGi BundleContext for the instance of a channel * @param filter * @param clazz * @return EventChannel * @throws InvalidSyntaxException */ private EventChannel getService(final String filter, final Class<EventChannel> clazz) throws InvalidSyntaxException { Filter f = LocalBundleActivator.getContext().createFilter(filter); ServiceReference [] sRefs = LocalBundleActivator.getContext().getServiceReferences(clazz.getName(), filter); return (EventChannel) LocalBundleActivator.getContext().getService(sRefs[0]); } /** * Cycles through the out-channels, removing the attached StreamSink listener */ private void removeProductionChannelListeners() { for (OutChannels chan : outChannelMap.keySet()) { removeProductionListenersFromChannel(chan); } } /** * Removes the production <code>StreamSink</code> from each out-channel, * storing them to one side for later re-attachment to the channel on shutdown. * @param outChanEnum */ private void removeProductionListenersFromChannel(OutChannels outChanEnum) { EventChannel chan = getChannelFromMap(outChanEnum); for (Object prodListener : chan.getEventSinks()) { prodOutChannelMap.put(chan, (StreamSink)prodListener); chan.removeEventSink(prodListener); } } /** * Seeks the one and only EventChannel instance based on the <code>OutChannels</code> enum given. * @param outChanEnum * @return */ private EventChannel getChannelFromMap(final OutChannels outChanEnum) { EventChannel c = outChannelMap.get(outChanEnum); return (c != null ? c : inChannelMap.get(outChanEnum)); } /** * Cycles though the available set of out-channels, adding an instance * of the <code>CQLTestEventSink</code> to each * @throws Exception */ private void createAndAttachTestListeners() throws Exception { for (OutChannels outChanEnum : outChannelMap.keySet()) { createAndAttachTestListener(outChanEnum); } } /** * Creates a new <code>CQLTestEventSink</code> and then attaches this to a production channel * @param outChanEnum * @throws Exception */ private void createAndAttachTestListener(final OutChannels outChanEnum) throws Exception { CQLTestEventSink eventSink = new CQLTestEventSink(outChanEnum.name()+"Listener"); outChannelMap.get(outChanEnum).addEventSink(eventSink); cqlTestListeners.put(outChanEnum, eventSink); } private void removeCQLTestEventListeners() { for (OutChannels outChanEnum : outChannelMap.keySet()) { removeCQLTestListener(outChanEnum); } } private void restoreProductionListeners() throws Exception { for (OutChannels outChanEnum : outChannelMap.keySet()) { restoreProductionListener(outChanEnum); } } private void removeCQLTestListener(final OutChannels outChanEnum) { EventChannel chan = getChannelFromMap(outChanEnum); chan.removeEventSink(cqlTestListeners.get(outChanEnum)); } private void restoreProductionListener(OutChannels outChanEnum) throws Exception { EventChannel chan = getChannelFromMap(outChanEnum); StreamSink prodListener = prodOutChannelMap.get(outChanEnum); chan.addEventSink(prodListener); } public EventChannel getEventChannel(InChannels inChannel) { return inChannelMap.get(inChannel); } public EventChannel getEventChannel(OutChannels outChannel) { return outChannelMap.get(outChannel); } public CQLTestEventSink getCqlTestEventSink(OutChannels channel) { return cqlTestListeners.get(channel); } }