1
2
3
4
5
6
7
8
9
10
11 package org.mule.providers;
12
13 import org.mule.MuleManager;
14 import org.mule.MuleRuntimeException;
15 import org.mule.config.ThreadingProfile;
16 import org.mule.config.i18n.CoreMessages;
17 import org.mule.impl.AlreadyInitialisedException;
18 import org.mule.impl.DefaultExceptionStrategy;
19 import org.mule.impl.ImmutableMuleEndpoint;
20 import org.mule.impl.MuleSessionHandler;
21 import org.mule.impl.internal.notifications.ConnectionNotification;
22 import org.mule.providers.service.TransportFactory;
23 import org.mule.providers.service.TransportServiceDescriptor;
24 import org.mule.providers.service.TransportServiceException;
25 import org.mule.routing.filters.WildcardFilter;
26 import org.mule.umo.MessagingException;
27 import org.mule.umo.UMOComponent;
28 import org.mule.umo.UMOEvent;
29 import org.mule.umo.UMOException;
30 import org.mule.umo.UMOMessage;
31 import org.mule.umo.endpoint.UMOEndpoint;
32 import org.mule.umo.endpoint.UMOEndpointURI;
33 import org.mule.umo.endpoint.UMOImmutableEndpoint;
34 import org.mule.umo.lifecycle.DisposeException;
35 import org.mule.umo.lifecycle.Initialisable;
36 import org.mule.umo.lifecycle.InitialisationException;
37 import org.mule.umo.manager.UMOServerNotification;
38 import org.mule.umo.manager.UMOWorkManager;
39 import org.mule.umo.provider.ConnectorException;
40 import org.mule.umo.provider.DispatchException;
41 import org.mule.umo.provider.UMOConnectable;
42 import org.mule.umo.provider.UMOConnector;
43 import org.mule.umo.provider.UMOMessageAdapter;
44 import org.mule.umo.provider.UMOMessageDispatcher;
45 import org.mule.umo.provider.UMOMessageDispatcherFactory;
46 import org.mule.umo.provider.UMOMessageReceiver;
47 import org.mule.umo.provider.UMOSessionHandler;
48 import org.mule.umo.provider.UMOStreamMessageAdapter;
49 import org.mule.umo.transformer.UMOTransformer;
50 import org.mule.util.ClassUtils;
51 import org.mule.util.CollectionUtils;
52 import org.mule.util.ObjectNameHelper;
53 import org.mule.util.ObjectUtils;
54 import org.mule.util.PropertiesUtils;
55 import org.mule.util.StringUtils;
56 import org.mule.util.concurrent.NamedThreadFactory;
57 import org.mule.util.concurrent.WaitableBoolean;
58
59 import java.beans.ExceptionListener;
60 import java.io.InputStream;
61 import java.io.OutputStream;
62 import java.util.ArrayList;
63 import java.util.Collections;
64 import java.util.HashMap;
65 import java.util.Iterator;
66 import java.util.List;
67 import java.util.Map;
68 import java.util.Properties;
69
70 import javax.resource.spi.work.WorkEvent;
71 import javax.resource.spi.work.WorkListener;
72
73 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
74 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentMap;
75 import edu.emory.mathcs.backport.java.util.concurrent.ScheduledExecutorService;
76 import edu.emory.mathcs.backport.java.util.concurrent.ScheduledThreadPoolExecutor;
77 import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
78 import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
79 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
80 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicReference;
81
82 import org.apache.commons.beanutils.BeanUtils;
83 import org.apache.commons.logging.Log;
84 import org.apache.commons.logging.LogFactory;
85 import org.apache.commons.pool.KeyedPoolableObjectFactory;
86 import org.apache.commons.pool.impl.GenericKeyedObjectPool;
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113 public abstract class AbstractConnector
114 implements UMOConnector, ExceptionListener, UMOConnectable, WorkListener
115 {
116
117
118
119 public static final int DEFAULT_NUM_CONCURRENT_TX_RECEIVERS = 4;
120
121
122
123
124 protected final Log logger = LogFactory.getLog(getClass());
125
126
127
128
129 protected final AtomicBoolean started = new AtomicBoolean(false);
130
131
132
133
134 protected final AtomicBoolean initialised = new AtomicBoolean(false);
135
136
137
138
139 protected volatile String name;
140
141
142
143
144 protected volatile ExceptionListener exceptionListener;
145
146
147
148
149 protected final AtomicBoolean disposed = new AtomicBoolean(false);
150
151
152
153
154 protected final AtomicBoolean disposing = new AtomicBoolean(false);
155
156
157
158
159 protected volatile UMOMessageDispatcherFactory dispatcherFactory;
160
161
162
163
164 protected final GenericKeyedObjectPool dispatchers = new GenericKeyedObjectPool();
165
166
167
168
169 protected final ConcurrentMap receivers = new ConcurrentHashMap();
170
171
172
173
174 private volatile ThreadingProfile dispatcherThreadingProfile = MuleManager.getConfiguration()
175 .getMessageDispatcherThreadingProfile();
176
177
178
179
180 private volatile ThreadingProfile receiverThreadingProfile = MuleManager.getConfiguration()
181 .getMessageReceiverThreadingProfile();
182
183
184
185
186 protected volatile boolean createMultipleTransactedReceivers = true;
187
188
189
190
191 protected volatile int numberOfConcurrentTransactedReceivers = DEFAULT_NUM_CONCURRENT_TX_RECEIVERS;
192
193
194
195
196
197 protected volatile UMOTransformer defaultInboundTransformer;
198
199
200
201
202
203 protected volatile UMOTransformer defaultOutboundTransformer;
204
205
206
207
208
209 protected volatile UMOTransformer defaultResponseTransformer;
210
211 protected volatile ConnectionStrategy connectionStrategy;
212
213 protected final WaitableBoolean connected = new WaitableBoolean(false);
214
215 protected final WaitableBoolean connecting = new WaitableBoolean(false);
216
217
218
219
220
221 protected final WaitableBoolean startOnConnect = new WaitableBoolean(false);
222
223
224
225
226
227 private volatile boolean enableMessageEvents;
228
229 private final List supportedProtocols;
230
231
232
233
234 private final AtomicReference
235
236
237
238
239 private final AtomicReference
240
241
242
243
244 private final AtomicReference
245
246
247
248
249 protected volatile TransportServiceDescriptor serviceDescriptor;
250
251
252
253
254
255 protected volatile Properties serviceOverrides;
256
257
258
259
260
261 protected volatile UMOSessionHandler sessionHandler = new MuleSessionHandler();
262
263
264 public AbstractConnector()
265 {
266 super();
267
268
269 exceptionListener = new DefaultExceptionStrategy();
270 connectionStrategy = MuleManager.getConfiguration().getConnectionStrategy();
271 enableMessageEvents = MuleManager.getConfiguration().isEnableMessageEvents();
272
273
274 supportedProtocols = new ArrayList();
275 supportedProtocols.add(getProtocol().toLowerCase());
276
277
278
279
280 dispatchers.setTestOnBorrow(false);
281 dispatchers.setTestOnReturn(true);
282 }
283
284
285 public String getName()
286 {
287 return name;
288 }
289
290
291 public void setName(String newName)
292 {
293 if (newName == null)
294 {
295 throw new IllegalArgumentException(CoreMessages.objectIsNull("Connector name").toString());
296 }
297
298 if (logger.isDebugEnabled())
299 {
300 logger.debug("Set UMOConnector name to: " + newName);
301 }
302
303 name = newName;
304 }
305
306
307 public final synchronized void initialise() throws InitialisationException
308 {
309 if (initialised.get())
310 {
311 throw new AlreadyInitialisedException("Connector '" + getName() + "'", this);
312 }
313
314 if (logger.isInfoEnabled())
315 {
316 logger.info("Initialising: " + this);
317 }
318
319
320 this.initFromServiceDescriptor();
321
322
323
324
325 this.disposeDispatchers();
326 this.disposeReceivers();
327
328 this.doInitialise();
329
330 if (exceptionListener instanceof Initialisable)
331 {
332 ((Initialisable) exceptionListener).initialise();
333 }
334
335 initialised.set(true);
336 }
337
338
339 public final synchronized void startConnector() throws UMOException
340 {
341 this.checkDisposed();
342
343 if (!this.isStarted())
344 {
345 if (!this.isConnected())
346 {
347 startOnConnect.set(true);
348
349
350 connectionStrategy.connect(this);
351
352 return;
353 }
354
355 if (logger.isInfoEnabled())
356 {
357 logger.info("Starting: " + this);
358 }
359
360
361 ScheduledExecutorService currentScheduler = (ScheduledExecutorService) scheduler.get();
362 if (currentScheduler == null || currentScheduler.isShutdown())
363 {
364 scheduler.set(this.getScheduler());
365 }
366
367 this.doStart();
368 started.set(true);
369
370 if (receivers != null)
371 {
372 for (Iterator iterator = receivers.values().iterator(); iterator.hasNext();)
373 {
374 UMOMessageReceiver mr = (UMOMessageReceiver) iterator.next();
375 if (logger.isDebugEnabled())
376 {
377 logger.debug("Starting receiver on endpoint: " + mr.getEndpoint().getEndpointURI());
378 }
379 mr.start();
380 }
381 }
382
383 if (logger.isInfoEnabled())
384 {
385 logger.info("Started: " + this);
386 }
387 }
388 }
389
390
391 public boolean isStarted()
392 {
393 return started.get();
394 }
395
396
397 public final synchronized void stopConnector() throws UMOException
398 {
399 if (this.isDisposed())
400 {
401 return;
402 }
403
404 if (this.isStarted())
405 {
406 if (logger.isInfoEnabled())
407 {
408 logger.info("Stopping: " + this);
409 }
410
411
412 ((ScheduledExecutorService) scheduler.get()).shutdown();
413
414 this.doStop();
415 started.set(false);
416
417
418
419 if (receivers != null)
420 {
421 for (Iterator iterator = receivers.values().iterator(); iterator.hasNext();)
422 {
423 UMOMessageReceiver mr = (UMOMessageReceiver) iterator.next();
424 if (logger.isDebugEnabled())
425 {
426 logger.debug("Stopping receiver on endpoint: " + mr.getEndpoint().getEndpointURI());
427 }
428 mr.stop();
429 }
430 }
431 }
432
433 if (this.isConnected())
434 {
435 try
436 {
437 this.disconnect();
438 }
439 catch (Exception e)
440 {
441
442 logger.error("Failed to disconnect: " + e.getMessage(), e);
443 }
444 }
445
446
447 scheduler.set(null);
448
449
450
451
452 this.initialised.set(false);
453
454 if (logger.isInfoEnabled())
455 {
456 logger.info("Stopped: " + this);
457 }
458 }
459
460
461 public final synchronized void dispose()
462 {
463 disposing.set(true);
464
465 if (logger.isInfoEnabled())
466 {
467 logger.info("Disposing: " + this);
468 }
469
470 try
471 {
472 this.stopConnector();
473 }
474 catch (UMOException e)
475 {
476 logger.warn("Failed to stop during shutdown: " + e.getMessage(), e);
477 }
478
479 this.disposeReceivers();
480 this.disposeDispatchers();
481 this.disposeWorkManagers();
482
483 this.doDispose();
484 disposed.set(true);
485
486 if (logger.isInfoEnabled())
487 {
488 logger.info("Disposed: " + this);
489 }
490 }
491
492 protected void disposeWorkManagers()
493 {
494 logger.debug("Disposing dispatcher work manager");
495 UMOWorkManager workManager = (UMOWorkManager) dispatcherWorkManager.get();
496 if (workManager != null)
497 {
498 workManager.dispose();
499 }
500 dispatcherWorkManager.set(null);
501
502 logger.debug("Disposing receiver work manager");
503 workManager = (UMOWorkManager) receiverWorkManager.get();
504 if (workManager != null)
505 {
506 workManager.dispose();
507 }
508 receiverWorkManager.set(null);
509 }
510
511 protected void disposeReceivers()
512 {
513 if (receivers != null)
514 {
515 logger.debug("Disposing Receivers");
516
517 for (Iterator iterator = receivers.values().iterator(); iterator.hasNext();)
518 {
519 UMOMessageReceiver receiver = (UMOMessageReceiver) iterator.next();
520
521 try
522 {
523 this.destroyReceiver(receiver, receiver.getEndpoint());
524 }
525 catch (Throwable e)
526 {
527
528 logger.error("Failed to destroy receiver: " + receiver, e);
529 }
530 }
531
532 receivers.clear();
533 logger.debug("Receivers Disposed");
534 }
535 }
536
537 protected void disposeDispatchers()
538 {
539 if (dispatchers != null)
540 {
541 logger.debug("Disposing Dispatchers");
542 dispatchers.clear();
543 logger.debug("Dispatchers Disposed");
544 }
545 }
546
547
548 public boolean isDisposed()
549 {
550 return disposed.get();
551 }
552
553
554 public void handleException(Exception exception)
555 {
556 if (exceptionListener == null)
557 {
558 throw new MuleRuntimeException(
559 CoreMessages.exceptionOnConnectorNotExceptionListener(this.getName()), exception);
560 }
561 else
562 {
563 exceptionListener.exceptionThrown(exception);
564 }
565 }
566
567
568 public void exceptionThrown(Exception e)
569 {
570 handleException(e);
571 }
572
573
574
575
576
577 public ExceptionListener getExceptionListener()
578 {
579 return exceptionListener;
580 }
581
582
583
584
585
586 public void setExceptionListener(ExceptionListener listener)
587 {
588 exceptionListener = listener;
589 }
590
591
592
593
594 public UMOMessageDispatcherFactory getDispatcherFactory()
595 {
596 return dispatcherFactory;
597 }
598
599
600
601
602 public void setDispatcherFactory(UMOMessageDispatcherFactory dispatcherFactory)
603 {
604 KeyedPoolableObjectFactory poolFactory;
605
606 if (dispatcherFactory instanceof KeyedPoolableObjectFactory)
607 {
608 poolFactory = (KeyedPoolableObjectFactory) dispatcherFactory;
609 }
610 else
611 {
612
613 poolFactory = new KeyedPoolMessageDispatcherFactoryAdapter(dispatcherFactory);
614 }
615
616 this.dispatchers.setFactory(poolFactory);
617
618
619
620 this.dispatcherFactory = dispatcherFactory;
621 }
622
623
624
625
626
627
628
629 public int getMaxDispatchersActive()
630 {
631 return this.dispatchers.getMaxActive();
632 }
633
634
635
636
637
638
639
640 public void setMaxDispatchersActive(int maxActive)
641 {
642 this.dispatchers.setMaxActive(maxActive);
643
644 this.dispatchers.setMaxIdle(maxActive);
645 }
646
647 private UMOMessageDispatcher getDispatcher(UMOImmutableEndpoint endpoint) throws UMOException
648 {
649 this.checkDisposed();
650
651 if (endpoint == null)
652 {
653 throw new IllegalArgumentException("Endpoint must not be null");
654 }
655
656 if (!this.supportsProtocol(endpoint.getConnector().getProtocol()))
657 {
658 throw new IllegalArgumentException(
659 CoreMessages.connectorSchemeIncompatibleWithEndpointScheme(this.getProtocol(),
660 endpoint.getEndpointURI().toString()).getMessage());
661 }
662
663 try
664 {
665 if (logger.isDebugEnabled())
666 {
667 logger.debug("Borrowing a dispatcher for endpoint: " + endpoint.getEndpointURI());
668 }
669 }
670 catch (Exception ex)
671 {
672 throw new ConnectorException(CoreMessages.connectorCausedError(), this, ex);
673 }
674
675 UMOMessageDispatcher dispatcher = null;
676
677 try
678 {
679 dispatcher = (UMOMessageDispatcher) dispatchers.borrowObject(endpoint);
680 return dispatcher;
681 }
682 catch (Exception ex)
683 {
684 throw new ConnectorException(CoreMessages.connectorCausedError(), this, ex);
685 }
686 finally
687 {
688 try
689 {
690 if (logger.isDebugEnabled())
691 {
692 logger.debug("Borrowed dispatcher: " + ObjectUtils.toString(dispatcher, "null"));
693 }
694 }
695 catch (Exception ex)
696 {
697 throw new ConnectorException(CoreMessages.connectorCausedError(), this, ex);
698 }
699 }
700 }
701
702 private void returnDispatcher(UMOImmutableEndpoint endpoint, UMOMessageDispatcher dispatcher)
703 {
704 if (endpoint != null && dispatcher != null)
705 {
706 try
707 {
708 if (logger.isDebugEnabled())
709 {
710 logger.debug("Returning dispatcher for endpoint: " + endpoint.getEndpointURI() + " = "
711 + ObjectUtils.toString(dispatcher, "null"));
712 }
713 }
714 catch (Exception ex)
715 {
716
717
718
719 }
720 finally
721 {
722 try
723 {
724 dispatchers.returnObject(endpoint, dispatcher);
725 }
726 catch (Exception ex)
727 {
728
729
730 }
731 }
732 }
733 }
734
735 protected void checkDisposed() throws DisposeException
736 {
737 if (this.isDisposed())
738 {
739 throw new DisposeException(CoreMessages.cannotUseDisposedConnector(), this);
740 }
741 }
742
743
744 public UMOMessageReceiver registerListener(UMOComponent component, UMOEndpoint endpoint) throws Exception
745 {
746 if (endpoint == null)
747 {
748 throw new IllegalArgumentException("The endpoint cannot be null when registering a listener");
749 }
750
751 if (component == null)
752 {
753 throw new IllegalArgumentException("The component cannot be null when registering a listener");
754 }
755
756 UMOEndpointURI endpointUri = endpoint.getEndpointURI();
757 if (endpointUri == null)
758 {
759 throw new ConnectorException(CoreMessages.endpointIsNullForListener(), this);
760 }
761
762 logger.info("Registering listener: " + component.getDescriptor().getName() + " on endpointUri: "
763 + endpointUri.toString());
764
765 UMOMessageReceiver receiver = this.getReceiver(component, endpoint);
766
767 if (receiver != null)
768 {
769 throw new ConnectorException(CoreMessages.listenerAlreadyRegistered(endpointUri), this);
770 }
771 else
772 {
773 receiver = this.createReceiver(component, endpoint);
774 Object receiverKey = getReceiverKey(component, endpoint);
775 receiver.setReceiverKey(receiverKey.toString());
776 receivers.put(receiverKey, receiver);
777
778 }
779
780 return receiver;
781 }
782
783
784
785
786
787
788
789
790 protected Object getReceiverKey(UMOComponent component, UMOEndpoint endpoint)
791 {
792 return StringUtils.defaultIfEmpty(endpoint.getEndpointURI().getFilterAddress(), endpoint
793 .getEndpointURI().getAddress());
794 }
795
796
797 public final void unregisterListener(UMOComponent component, UMOEndpoint endpoint) throws Exception
798 {
799 if (component == null)
800 {
801 throw new IllegalArgumentException(
802 "The component must not be null when you unregister a listener");
803 }
804
805 if (endpoint == null)
806 {
807 throw new IllegalArgumentException("The endpoint must not be null when you unregister a listener");
808 }
809
810 UMOEndpointURI endpointUri = endpoint.getEndpointURI();
811 if (endpointUri == null)
812 {
813 throw new IllegalArgumentException(
814 "The endpointUri must not be null when you unregister a listener");
815 }
816
817 if (logger.isInfoEnabled())
818 {
819 logger.info("Removing listener on endpointUri: " + endpointUri);
820 }
821
822 if (receivers != null && !receivers.isEmpty())
823 {
824 UMOMessageReceiver receiver = (UMOMessageReceiver) receivers.remove(this.getReceiverKey(component,
825 endpoint));
826 if (receiver != null)
827 {
828 this.destroyReceiver(receiver, endpoint);
829 }
830 }
831 }
832
833
834
835
836
837
838 public ThreadingProfile getDispatcherThreadingProfile()
839 {
840 return dispatcherThreadingProfile;
841 }
842
843
844
845
846
847
848
849 public void setDispatcherThreadingProfile(ThreadingProfile dispatcherThreadingProfile)
850 {
851 this.dispatcherThreadingProfile = dispatcherThreadingProfile;
852 }
853
854
855
856
857
858
859 public ThreadingProfile getReceiverThreadingProfile()
860 {
861 return receiverThreadingProfile;
862 }
863
864
865
866
867
868
869
870 public void setReceiverThreadingProfile(ThreadingProfile receiverThreadingProfile)
871 {
872 this.receiverThreadingProfile = receiverThreadingProfile;
873 }
874
875 protected void destroyReceiver(UMOMessageReceiver receiver, UMOEndpoint endpoint) throws Exception
876 {
877 receiver.dispose();
878 }
879
880 protected abstract void doInitialise() throws InitialisationException;
881
882
883
884
885 protected abstract void doDispose();
886
887
888
889
890
891
892 protected abstract void doStart() throws UMOException;
893
894
895
896
897
898
899 protected abstract void doStop() throws UMOException;
900
901
902
903
904
905
906 public UMOTransformer getDefaultInboundTransformer()
907 {
908 if (defaultInboundTransformer != null)
909 {
910 try
911 {
912 return (UMOTransformer) defaultInboundTransformer.clone();
913 }
914 catch (CloneNotSupportedException e)
915 {
916
917 logger.error("Failed to clone default Inbound transformer");
918 }
919 }
920
921 return null;
922 }
923
924
925
926
927
928
929
930 public void setDefaultInboundTransformer(UMOTransformer defaultInboundTransformer)
931 {
932 this.defaultInboundTransformer = defaultInboundTransformer;
933 }
934
935
936
937
938
939
940 public UMOTransformer getDefaultResponseTransformer()
941 {
942 if (defaultResponseTransformer != null)
943 {
944 try
945 {
946 return (UMOTransformer) defaultResponseTransformer.clone();
947 }
948 catch (CloneNotSupportedException e)
949 {
950
951 logger.error("Failed to clone default Outbound transformer");
952 }
953 }
954
955 return null;
956 }
957
958
959
960
961
962
963 public UMOTransformer getDefaultOutboundTransformer()
964 {
965 if (defaultOutboundTransformer != null)
966 {
967 try
968 {
969 return (UMOTransformer) defaultOutboundTransformer.clone();
970 }
971 catch (CloneNotSupportedException e)
972 {
973
974 logger.error("Failed to clone default Outbound transformer");
975 }
976 }
977
978 return null;
979 }
980
981
982
983
984
985
986
987 public void setDefaultOutboundTransformer(UMOTransformer defaultOutboundTransformer)
988 {
989 this.defaultOutboundTransformer = defaultOutboundTransformer;
990 }
991
992
993
994
995
996
997
998 public void setDefaultResponseTransformer(UMOTransformer defaultResponseTransformer)
999 {
1000 this.defaultResponseTransformer = defaultResponseTransformer;
1001 }
1002
1003
1004
1005
1006
1007
1008 public ReplyToHandler getReplyToHandler()
1009 {
1010 return new DefaultReplyToHandler(defaultResponseTransformer);
1011 }
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024 public void fireNotification(UMOServerNotification notification)
1025 {
1026 MuleManager.getInstance().fireNotification(notification);
1027 }
1028
1029
1030
1031
1032
1033
1034 public ConnectionStrategy getConnectionStrategy()
1035 {
1036
1037
1038
1039 try
1040 {
1041 return (ConnectionStrategy) BeanUtils.cloneBean(connectionStrategy);
1042 }
1043 catch (Exception e)
1044 {
1045 throw new MuleRuntimeException(CoreMessages.failedToClone("connectionStrategy"), e);
1046 }
1047 }
1048
1049
1050
1051
1052
1053
1054 public void setConnectionStrategy(ConnectionStrategy connectionStrategy)
1055 {
1056 this.connectionStrategy = connectionStrategy;
1057 }
1058
1059
1060 public boolean isDisposing()
1061 {
1062 return disposing.get();
1063 }
1064
1065
1066 public boolean isRemoteSyncEnabled()
1067 {
1068 return false;
1069 }
1070
1071 public UMOMessageReceiver getReceiver(UMOComponent component, UMOEndpoint endpoint)
1072 {
1073 return (UMOMessageReceiver) receivers.get(this.getReceiverKey(component, endpoint));
1074 }
1075
1076
1077
1078
1079
1080
1081 public Map getReceivers()
1082 {
1083 return Collections.unmodifiableMap(receivers);
1084 }
1085
1086 public UMOMessageReceiver lookupReceiver(String key)
1087 {
1088 if (key != null)
1089 {
1090 return (UMOMessageReceiver) receivers.get(key);
1091 }
1092 else
1093 {
1094 throw new IllegalArgumentException("Receiver key must not be null");
1095 }
1096 }
1097
1098 public UMOMessageReceiver[] getReceivers(String wildcardExpression)
1099 {
1100 WildcardFilter filter = new WildcardFilter(wildcardExpression);
1101 filter.setCaseSensitive(false);
1102
1103 List found = new ArrayList();
1104
1105 for (Iterator iterator = receivers.entrySet().iterator(); iterator.hasNext();)
1106 {
1107 Map.Entry e = (Map.Entry) iterator.next();
1108 if (filter.accept(e.getKey()))
1109 {
1110 found.add(e.getValue());
1111 }
1112 }
1113
1114 return (UMOMessageReceiver[]) CollectionUtils.toArrayOfComponentType(found,
1115 UMOMessageReceiver.class);
1116 }
1117
1118
1119 public void connect() throws Exception
1120 {
1121 this.checkDisposed();
1122
1123 if (connected.get())
1124 {
1125 return;
1126 }
1127
1128
1129
1130
1131
1132
1133
1134 try
1135 {
1136 if (connecting.get())
1137 {
1138 this.doConnect();
1139 }
1140
1141 if (connecting.compareAndSet(false, true))
1142 {
1143 if (logger.isDebugEnabled())
1144 {
1145 logger.debug("Connecting: " + this);
1146 }
1147
1148 connectionStrategy.connect(this);
1149
1150 logger.info("Connected: " + getConnectionDescription());
1151
1152
1153 return;
1154 }
1155
1156
1157
1158
1159 connected.set(true);
1160 connecting.set(false);
1161
1162 this.fireNotification(new ConnectionNotification(this, getConnectEventId(),
1163 ConnectionNotification.CONNECTION_CONNECTED));
1164 }
1165 catch (Exception e)
1166 {
1167 connected.set(false);
1168 connecting.set(false);
1169
1170 this.fireNotification(new ConnectionNotification(this, getConnectEventId(),
1171 ConnectionNotification.CONNECTION_FAILED));
1172
1173 if (e instanceof ConnectException || e instanceof FatalConnectException)
1174 {
1175
1176 throw e;
1177 }
1178 else
1179 {
1180 throw new ConnectException(e, this);
1181 }
1182 }
1183
1184 if (startOnConnect.get())
1185 {
1186 this.startConnector();
1187 }
1188 else
1189 {
1190 for (Iterator iterator = receivers.values().iterator(); iterator.hasNext();)
1191 {
1192 UMOMessageReceiver receiver = (UMOMessageReceiver) iterator.next();
1193 if (logger.isDebugEnabled())
1194 {
1195 logger.debug("Connecting receiver on endpoint: "
1196 + receiver.getEndpoint().getEndpointURI());
1197 }
1198 receiver.connect();
1199 }
1200 }
1201 }
1202
1203
1204 public void disconnect() throws Exception
1205 {
1206 startOnConnect.set(this.isStarted());
1207
1208 this.fireNotification(new ConnectionNotification(this, getConnectEventId(),
1209 ConnectionNotification.CONNECTION_DISCONNECTED));
1210
1211 connected.set(false);
1212
1213 try
1214 {
1215 this.doDisconnect();
1216 }
1217 finally
1218 {
1219 this.stopConnector();
1220 }
1221
1222 logger.info("Disconnected: " + this.getConnectionDescription());
1223 }
1224
1225
1226 public String getConnectionDescription()
1227 {
1228 return this.toString();
1229 }
1230
1231
1232 public final boolean isConnected()
1233 {
1234 return connected.get();
1235 }
1236
1237
1238
1239
1240
1241
1242 protected abstract void doConnect() throws Exception;
1243
1244
1245
1246
1247
1248
1249
1250 protected abstract void doDisconnect() throws Exception;
1251
1252
1253
1254
1255
1256
1257 protected String getConnectEventId()
1258 {
1259 return getName();
1260 }
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271 public boolean isCreateMultipleTransactedReceivers()
1272 {
1273 return createMultipleTransactedReceivers;
1274 }
1275
1276
1277
1278
1279
1280
1281 public void setCreateMultipleTransactedReceivers(boolean createMultipleTransactedReceivers)
1282 {
1283 this.createMultipleTransactedReceivers = createMultipleTransactedReceivers;
1284 }
1285
1286
1287
1288
1289
1290
1291
1292 public int getNumberOfConcurrentTransactedReceivers()
1293 {
1294 return numberOfConcurrentTransactedReceivers;
1295 }
1296
1297
1298
1299
1300
1301 public void setNumberOfConcurrentTransactedReceivers(int count)
1302 {
1303 numberOfConcurrentTransactedReceivers = count;
1304 }
1305
1306
1307
1308
1309
1310 public boolean isEnableMessageEvents()
1311 {
1312 return enableMessageEvents;
1313 }
1314
1315
1316
1317
1318
1319
1320
1321 public void setEnableMessageEvents(boolean enableMessageEvents)
1322 {
1323 this.enableMessageEvents = enableMessageEvents;
1324 }
1325
1326
1327