Skip to content

Commit aa4cab3

Browse files
committed
Merge pull request #67 from viv/master
Added unique IDs per sender
2 parents 31be56e + fe485ff commit aa4cab3

2 files changed

Lines changed: 110 additions & 15 deletions

File tree

src/main/java/org/buddycloud/channelserver/queue/FederatedQueueManager.java

Lines changed: 60 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package org.buddycloud.channelserver.queue;
22

3+
import java.io.UnsupportedEncodingException;
4+
import java.security.MessageDigest;
5+
import java.security.NoSuchAlgorithmException;
36
import java.util.ArrayList;
4-
import java.util.HashMap;
57
import java.util.List;
68
import java.util.concurrent.BlockingQueue;
79
import java.util.concurrent.ConcurrentHashMap;
8-
import java.util.concurrent.LinkedBlockingQueue;
910

1011
import org.apache.log4j.Logger;
1112
import org.buddycloud.channelserver.ChannelsEngine;
@@ -14,7 +15,6 @@
1415
import org.dom4j.Element;
1516
import org.dom4j.Namespace;
1617
import org.dom4j.dom.DOMElement;
17-
import org.hsqldb.Server;
1818
import org.xmpp.component.ComponentException;
1919
import org.xmpp.packet.IQ;
2020
import org.xmpp.packet.JID;
@@ -41,14 +41,14 @@ public class FederatedQueueManager {
4141
private ConcurrentHashMap<String, Integer> remoteServerItemsToProcess = new ConcurrentHashMap<String, Integer>();
4242
private ConcurrentHashMap<String, String> remoteServerInfoRequestIds = new ConcurrentHashMap<String, String>();
4343
private ConcurrentHashMap<String, List<Packet>> waitingStanzas = new ConcurrentHashMap<String, List<Packet>>();
44-
44+
45+
private ConcurrentHashMap<String, String> idMap = new ConcurrentHashMap<String, String>();
46+
4547
private ExpiringPacketQueue sentRemotePackets = new ExpiringPacketQueue();
4648
private ExpiringPacketQueue nodeMap = new ExpiringPacketQueue();
4749

4850
private String localServer;
4951

50-
private BlockingQueue<Packet> federatedResponseQueue;
51-
5252
public FederatedQueueManager(ChannelsEngine component, String localServer) {
5353
this.component = component;
5454
this.localServer = localServer;
@@ -66,7 +66,12 @@ private int getId() {
6666
public void process(Packet packet) throws ComponentException {
6767

6868
String to = packet.getTo().toString();
69-
sentRemotePackets.put(packet.getID(), packet.getFrom());
69+
70+
String uniqueId = generateUniqueId(packet);
71+
idMap.put(uniqueId, packet.getID());
72+
packet.setID(uniqueId);
73+
74+
sentRemotePackets.put(uniqueId, packet.getFrom());
7075
try {
7176
extractNodeDetails(packet);
7277
// Do we have a map already?
@@ -245,10 +250,14 @@ public void passResponseToRequester(IQ packet) throws Exception {
245250
"Can not find original requesting packet! (ID:"
246251
+ packet.getID() + ")");
247252
}
248-
packet.setTo((JID) sentRemotePackets.get(packet.getID()));
253+
254+
String uniqueId = packet.getID();
255+
packet.setID(idMap.get(uniqueId));
256+
packet.setTo((JID) sentRemotePackets.get(uniqueId));
249257
packet.setFrom(localServer);
250258
sentRemotePackets.remove(packet.getID());
251-
259+
idMap.remove(uniqueId);
260+
252261
component.sendPacket(packet);
253262
}
254263

@@ -271,4 +280,46 @@ public void addChannelMap(JID server) {
271280
logger.error(e);
272281
}
273282
}
283+
284+
/**
285+
* Generate a unique ID for a packet
286+
*
287+
* Supplied packet IDs might not be unique so we use the ID and the FROM
288+
* values to create a hash which we map back to the original packet ID.
289+
*
290+
* @param packet
291+
* @return unique ID for the packet
292+
*/
293+
private String generateUniqueId(Packet packet) {
294+
return generateMd5(packet.getID() + packet.getFrom());
295+
}
296+
297+
/**
298+
* Generates an MD5 hash of a supplied String
299+
*
300+
* @param message to encode
301+
* @return MD5 Hash of supplied string
302+
*/
303+
private String generateMd5(String message) {
304+
String digest = null;
305+
try {
306+
MessageDigest md = MessageDigest.getInstance("MD5");
307+
byte[] hash = md.digest(message.getBytes("UTF-8"));
308+
309+
//converting byte array to Hexadecimal String
310+
StringBuilder sb = new StringBuilder(2*hash.length);
311+
for(byte b : hash) {
312+
sb.append(String.format("%02x", b&0xff));
313+
}
314+
315+
digest = sb.toString();
316+
} catch (UnsupportedEncodingException e) {
317+
logger.info("Error generating unique packet ID");
318+
logger.error(e);
319+
} catch (NoSuchAlgorithmException e) {
320+
logger.info("Error generating unique packet ID");
321+
logger.error(e);
322+
}
323+
return digest;
324+
}
274325
}

src/test/java/org/buddycloud/channelserver/queue/FederatedQueueManagerTest.java

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ public void testPassingChannelServerIdentifierViaItemsResultsInQueuedPacketSendi
161161
Assert.assertEquals(expectedForwaredPacket.toXML(),
162162
originalPacketRedirected.toXML());
163163
}
164-
164+
165165
@Test
166166
public void testOutgoingFederatedPacketsAreRoutedBackToOriginalSender() throws Exception {
167167

@@ -175,14 +175,58 @@ public void testOutgoingFederatedPacketsAreRoutedBackToOriginalSender() throws E
175175
packet.getElement().addAttribute("remote-server-discover", "false");
176176

177177
queueManager.process(packet.createCopy());
178-
channelsEngine.poll();
179-
180-
IQ response = IQ.createResultIQ(packet);
178+
IQ originalPacketRedirected = (IQ) channelsEngine.poll();
179+
180+
IQ response = IQ.createResultIQ(originalPacketRedirected);
181181
queueManager.passResponseToRequester(response);
182-
182+
183183
Assert.assertEquals(1, channelsEngine.size());
184184
Packet redirected = channelsEngine.poll();
185-
185+
186+
System.out.println(packet);
187+
System.out.println(redirected);
188+
186189
Assert.assertEquals(packet.getFrom(), redirected.getTo());
187190
}
191+
192+
@Test
193+
public void testOutgoingFederatedPacketsFromDifferentClientsUsingSameIdAreRoutedBackToOriginalSender() throws Exception {
194+
195+
channelsEngine.clear();
196+
197+
IQ clientOnePacket = new IQ();
198+
clientOnePacket.setID("1:some-request");
199+
clientOnePacket.setFrom(new JID("romeo@montague.lit/street"));
200+
clientOnePacket.setTo(new JID("topics.capulet.lit"));
201+
clientOnePacket.setType(IQ.Type.get);
202+
clientOnePacket.getElement().addAttribute("remote-server-discover", "false");
203+
204+
IQ clientTwoPacket = new IQ();
205+
clientTwoPacket.setID("1:some-request");
206+
clientTwoPacket.setFrom(new JID("juliet@montague.lit/street"));
207+
clientTwoPacket.setTo(new JID("topics.capulet.lit"));
208+
clientTwoPacket.setType(IQ.Type.get);
209+
clientTwoPacket.getElement().addAttribute("remote-server-discover", "false");
210+
211+
queueManager.addChannelMap(new JID("topics.capulet.lit"));
212+
213+
queueManager.process(clientOnePacket.createCopy());
214+
queueManager.process(clientTwoPacket.createCopy());
215+
216+
IQ clientOneOriginalPacketRedirected = (IQ) channelsEngine.poll();
217+
IQ clientTwoOriginalPacketRedirected = (IQ) channelsEngine.poll();
218+
219+
IQ clientOneResponse = IQ.createResultIQ(clientOneOriginalPacketRedirected);
220+
queueManager.passResponseToRequester(clientOneResponse);
221+
222+
IQ clientTwoResponse = IQ.createResultIQ(clientTwoOriginalPacketRedirected);
223+
queueManager.passResponseToRequester(clientTwoResponse);
224+
225+
Assert.assertEquals(2, channelsEngine.size());
226+
Packet clientOneRedirected = channelsEngine.poll();
227+
Packet clientTwoRedirected = channelsEngine.poll();
228+
229+
Assert.assertEquals(clientOnePacket.getFrom(), clientOneRedirected.getTo());
230+
Assert.assertEquals(clientTwoPacket.getFrom(), clientTwoRedirected.getTo());
231+
}
188232
}

0 commit comments

Comments
 (0)