1 /*
2 * Replica is published under the terms
3 * of the Apache Software License.
4 */
5 package replica.group.jgroups;
6
7 import java.io.Serializable;
8 import java.util.Iterator;
9 import java.util.List;
10 import java.util.Vector;
11 import java.util.logging.Logger;
12
13 import org.jgroups.Channel;
14 import org.jgroups.ChannelClosedException;
15 import org.jgroups.ChannelException;
16 import org.jgroups.ChannelFactory;
17 import org.jgroups.ChannelNotConnectedException;
18 import org.jgroups.MembershipListener;
19 import org.jgroups.Message;
20 import org.jgroups.MessageListener;
21 import org.jgroups.View;
22 import org.jgroups.blocks.PullPushAdapter;
23 import org.jgroups.stack.IpAddress;
24
25 import replica.group.Address;
26 import replica.group.CommunicationException;
27 import replica.group.GroupManager;
28 import replica.group.event.GroupListener;
29 import replica.group.event.MessageEvent;
30 import replica.group.event.SuspectEvent;
31 import replica.group.event.ViewEvent;
32
33 /***
34 * Implementation of a group communications services provider using the
35 * jgroups.org code.
36 *
37 * @author Pedro Costa
38 * @author Helder Silva
39 * @since 19/Jan/2004
40 */
41 public class JGroupsGroupManager implements GroupManager {
42
43 static Logger logger = Logger.getLogger(JGroupsGroupManager.class.getName());
44
45 String groupName;
46 ChannelFactory channelFactory;
47 String channelProperties;
48 Channel channel;
49
50 List groupListeneres = new Vector();
51
52 PullPushAdapter block;
53
54 /***
55 * Default constructor.
56 */
57 public JGroupsGroupManager() {
58 super();
59 }
60
61 /***
62 * Adds a new listener for Group Events.
63 * @see replica.group.GroupManager#addListener(replica.group.event.GroupListener)
64 */
65 public void addListener(GroupListener listener) {
66 logger.fine("Adding new GroupListener.");
67
68 groupListeneres.add( listener );
69 }
70
71 /***
72 * Return the name of the group under management.
73 *
74 * @see replica.group.GroupManager#getGroupName()
75 */
76 public String getGroupName() {
77 return groupName;
78 }
79
80 /***
81 * Return a list of the current group members addresses.
82 *
83 * @see replica.group.GroupManager#getGroupView()
84 */
85 public List getGroupView() {
86
87 if( getChannel() == null )
88 throw new IllegalStateException("You are not connected to any group.");
89
90 View view = getChannel().getView();
91
92 return convertViewListFromJGroups( view );
93 }
94
95 /***
96 * Return the local address
97 *
98 * @see {@link GroupManager#getLocalManager()}
99 */
100 public Address getLocalAddress(){
101 return convertJGroupsIpAddressToIpAddress( (IpAddress)getChannel().getLocalAddress() );
102 }
103
104 /***
105 * Convert a JGroups View into a non specific implementation view.
106 *
107 */
108 protected List convertViewListFromJGroups(View view) {
109
110 List res = new Vector();
111
112 if( view == null )
113 return res;
114
115 List jgroupsList = view.getMembers();
116
117 for (Iterator iter = jgroupsList.iterator(); iter.hasNext();) {
118 replica.group.IpAddress addr =
119 convertJGroupsIpAddressToIpAddress( (IpAddress)iter.next() );
120
121 res.add(addr);
122 }
123
124 return res;
125 }
126
127 /***
128 * Join a group. You must set the channel and the group name before
129 * calling this method.
130 *
131 * @see replica.group.GroupManager#joinGroup()
132 * @throws IllegalStateException if the channel is null or the group name
133 * was not set.
134 * @throws CommunicationException if there is a problem connecting to the
135 * channel. You can look on the root cause of this exception
136 * for further information.
137 */
138 public void joinGroup() {
139
140 if( getChannel() == null || getGroupName() == null )
141 throw new IllegalStateException(
142 "You must set the channel and the group name before trying to join."
143 );
144
145 if( getChannel().isConnected() )
146 getChannel().disconnect();
147 try{
148 getChannel().connect( getGroupName() );
149 }
150 catch(ChannelClosedException e){
151 throw new CommunicationException("The channel was closed. Look on the root"+
152 "cause for more information.", e);
153 }
154 catch(ChannelException e){
155 throw new CommunicationException("Problem connecting to channel. Look on the"+
156 " root cause for more information.", e);
157 }
158
159 block = new PullPushAdapter( getChannel() );
160
161 block.setListener( new MyMessageListener() );
162
163 block.addMembershipListener( new MyMembershipListener() );
164
165 block.start();
166
167 logger.info("Joined group [" + getGroupName() + "]. Local address [" + getChannel().getLocalAddress()
168 + "]. View: {"+getChannel().getView()+"}.");
169 }
170
171 /***
172 * Leave a group.
173 *
174 * @see replica.group.GroupManager#leaveGroup()
175 */
176 public void leaveGroup() {
177
178 if( getChannel() != null && getChannel().isConnected() )
179 getChannel().disconnect();
180
181 logger.info("Leaved group [" + getGroupName() + "].");
182 }
183
184 /***
185 * @see replica.group.GroupManager#sendMessage(replica.group.Address, java.lang.Object)
186 */
187 public void sendMessage(Address rAddr, Object message) {
188
189 if( getChannel() == null )
190 throw new IllegalStateException("You are not connected to any group.");
191
192 IpAddress jgroupsRecAddr = null;
193 if( rAddr != null )
194 jgroupsRecAddr =
195 convertIpAddressToJGroupsIpAddress(
196 (replica.group.IpAddress)rAddr);
197 try{
198 logger.info("Sending message {" + message + "}.");
199
200 getChannel().send(null, jgroupsRecAddr, (Serializable)message);
201 }
202 catch(ChannelClosedException e){
203 throw new CommunicationException("Channel is closed. Look on the root"+
204 " cause for more information.",e);
205 }
206 catch(ChannelNotConnectedException e){
207 throw new CommunicationException("Channel not connected. Look on the root"+
208 " cause for more information.",e);
209 }
210 }
211
212 /***
213 * Convert an address in non specific implementation to a jgroups
214 * class.
215 *
216 * @param address
217 * @return
218 */
219 org.jgroups.stack.IpAddress convertIpAddressToJGroupsIpAddress(
220 replica.group.IpAddress address){
221 // TODO using the same class name is confusing
222 // should revise this.
223 return new org.jgroups.stack.IpAddress(
224 address.getAddress(),
225 address.getPort() );
226 }
227
228 /***
229 * Convert an address in jgroups class to an non specific implementation
230 * class.
231 *
232 * @param address
233 * @return
234 */
235 replica.group.IpAddress convertJGroupsIpAddressToIpAddress(
236 org.jgroups.stack.IpAddress address){
237 // TODO using the same class name is confusing
238 // should revise this.
239 replica.group.IpAddress repAddr = new replica.group.IpAddress();
240 repAddr.setAddress( address.getIpAddress() );
241 repAddr.setPort( address.getPort() );
242
243 return repAddr;
244 }
245
246 /***
247 * Short cut method. simply calls #sendMessage(Address, Object) with a null
248 * address.
249 *
250 * @see replica.group.GroupManager#sendMessage(java.lang.Object)
251 */
252 public void sendMessage(Object message) {
253 sendMessage(null, message);
254 }
255
256 /***
257 * Sets the name of the group to be managed. You still have to call
258 * joinGroup after setting it's name.
259 *
260 * @see replica.group.GroupManager#setGroupName(java.lang.String)
261 */
262 public void setGroupName(String groupName) {
263 this.groupName = groupName;
264 }
265
266 /***
267 * @see replica.group.GroupManager#removeListener(replica.group.event.GroupListener)
268 */
269 public boolean removeListener(GroupListener listener) {
270 return groupListeneres.remove(listener);
271 }
272
273 /*
274 * ==== GroupListener notify methods ====
275 */
276
277 /***
278 * Notifies GroupListener members that a new message was received.
279 *
280 */
281 void notifyMessage(Message message){
282
283 MessageEvent event = new MessageEvent(
284 this, message.getObject(),
285 convertJGroupsIpAddressToIpAddress(
286 (IpAddress)message.getSrc() ) );
287
288 for (Iterator iter = groupListeneres.iterator(); iter.hasNext();) {
289 ((GroupListener) iter.next()).message( event);
290 }
291 }
292
293 /***
294 * Notifies GroupListener members that view has changed.
295 *
296 * @param view
297 */
298 void notifyViewChanged(View view){
299 logger.info("View: {" + view + "}.");
300
301 ViewEvent event = new ViewEvent(this,
302 convertViewListFromJGroups( view ) );
303
304 for (Iterator iter = groupListeneres.iterator(); iter.hasNext();) {
305 ((GroupListener) iter.next()).viewChanged( event);
306 }
307 }
308
309 /***
310 * Notifies GroupListener members that a suspect message was received.
311 *
312 * @param address
313 */
314 void notifySuspect(org.jgroups.Address address){
315
316 SuspectEvent event = new SuspectEvent( this,
317 convertJGroupsIpAddressToIpAddress((IpAddress)address) );
318
319 for (Iterator iter = groupListeneres.iterator(); iter.hasNext();) {
320 ((GroupListener) iter.next()).suspect( event);
321 }
322 }
323
324 /***
325 * My message listener implementation.
326 *
327 * @author no119431
328 *
329 * To change the template for this generated type comment go to
330 * Window>Preferences>Java>Code Generation>Code and Comments
331 */
332 class MyMessageListener implements MessageListener{
333
334 /* (non-Javadoc)
335 * @see org.jgroups.MessageListener#getState()
336 */
337 public byte[] getState() {
338 // I'm not interested in this.
339 return null;
340 }
341
342 /***
343 * Simply calls the notify method on the main class.
344 *
345 * @see org.jgroups.MessageListener#receive(org.jgroups.Message)
346 */
347 public void receive(Message message) {
348 notifyMessage(message);
349 }
350
351 /* (non-Javadoc)
352 * @see org.jgroups.MessageListener#setState(byte[])
353 */
354 public void setState(byte[] arg0) {
355 // I'm not interested in this.
356 }
357
358 }
359
360 /***
361 * My Membership Listener implementation.
362 *
363 * @author no119431
364 *
365 * To change the template for this generated type comment go to
366 * Window>Preferences>Java>Code Generation>Code and Comments
367 */
368 class MyMembershipListener implements MembershipListener{
369
370 /* (non-Javadoc)
371 * @see org.jgroups.MembershipListener#block()
372 */
373 public void block() {
374 // TODO Should i do something here?
375
376 }
377
378 /***
379 * Simply calls the notify method on the main class.
380 *
381 * @see org.jgroups.MembershipListener#suspect(org.jgroups.Address)
382 */
383 public void suspect(org.jgroups.Address address) {
384
385 notifySuspect( address);
386 }
387
388 /***
389 * Simply calls the notify method on the main class.
390 *
391 * @see org.jgroups.MembershipListener#viewAccepted(org.jgroups.View)
392 */
393 public void viewAccepted(View view) {
394
395 notifyViewChanged( view);
396 }
397
398 }
399 /***
400 * @return
401 */
402 public ChannelFactory getChannelFactory() {
403 return channelFactory;
404 }
405
406 /***
407 * @param factory
408 */
409 public void setChannelFactory(ChannelFactory factory) {
410 channelFactory = factory;
411 }
412
413 /***
414 * @return
415 */
416 public String getChannelProperties() {
417 return channelProperties;
418 }
419
420 /***
421 * @param string
422 */
423 public void setChannelProperties(String string) {
424 channelProperties = string;
425 }
426
427 public Channel getChannel(){
428
429 if( channel == null ){
430 try{
431 setChannel(
432 getChannelFactory().createChannel( getChannelProperties() ) );
433 }
434 catch(ChannelException e){
435 throw new CommunicationException(e);
436 }
437 }
438
439 return channel;
440 }
441
442 public void setChannel(Channel newChannel){
443 channel = newChannel;
444 }
445
446 /***
447 * Returns a copy of the list of current group listeners.
448 *
449 * Mostly used for testing.
450 * @return
451 */
452 public List getGroupListeneres() {
453 return new Vector( groupListeneres );
454 }
455
456 }
This page was automatically generated by Maven