Skip to content

Commit 815482f

Browse files
author
Lloyd Watkin
committed
Handle subscription requests and add some initial tests for subscribing
1 parent 86b70a1 commit 815482f

2 files changed

Lines changed: 291 additions & 45 deletions

File tree

  • src
    • main/java/org/buddycloud/channelserver/packetprocessor/iq/namespace/pubsub/set
    • test/java/org/buddycloud/channelserver/packetprocessor/iq/namespace/pubsub/set

src/main/java/org/buddycloud/channelserver/packetprocessor/iq/namespace/pubsub/set/SubscribeSet.java

Lines changed: 59 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
21
package org.buddycloud.channelserver.packetprocessor.iq.namespace.pubsub.set;
32

43
import java.util.Collection;
@@ -36,10 +35,13 @@
3635
import org.xmpp.resultsetmanagement.ResultSet;
3736

3837
public class SubscribeSet extends PubSubElementProcessorAbstract {
39-
38+
4039
private static final String FIREHOSE = "/firehose";
4140
private static final Logger LOGGER = Logger.getLogger(SubscribeSet.class);
42-
41+
42+
public static final String MISSING_NODE_ID = "nodeid-required";
43+
public static final String INVALID_JID = "invalid-jid";
44+
4345
private final BlockingQueue<Packet> outQueue;
4446
private final ChannelManager channelManager;
4547

@@ -52,10 +54,11 @@ public SubscribeSet(BlockingQueue<Packet> outQueue,
5254
@Override
5355
public void process(Element elm, JID actorJID, IQ reqIQ, Element rsm)
5456
throws Exception {
55-
56-
node = elm.attributeValue("node");
57+
58+
node = reqIQ.getChildElement().element("subscribe")
59+
.attributeValue("node");
5760
request = reqIQ;
58-
61+
5962
if ((node == null) || (node.equals(""))) {
6063
missingNodeName();
6164
return;
@@ -75,9 +78,9 @@ public void process(Element elm, JID actorJID, IQ reqIQ, Element rsm)
7578
return;
7679
}
7780
}
78-
81+
7982
Map<String, String> nodeConf = null;
80-
83+
8184
if (node.equals(FIREHOSE)) {
8285
if (!channelManager.nodeExists(FIREHOSE)) {
8386
channelManager.addRemoteNode(FIREHOSE);
@@ -93,10 +96,7 @@ public void process(Element elm, JID actorJID, IQ reqIQ, Element rsm)
9396
}
9497

9598
// Subscribe to a node.
96-
97-
Transaction t = null;
9899
try {
99-
t = channelManager.beginTransaction();
100100

101101
NodeSubscription nodeSubscription = channelManager
102102
.getUserSubscription(node, subscribingJid);
@@ -129,7 +129,7 @@ public void process(Element elm, JID actorJID, IQ reqIQ, Element rsm)
129129

130130
Affiliations defaultAffiliation = null;
131131
Subscriptions defaultSubscription = null;
132-
132+
133133
if (!possibleExistingSubscription.in(Subscriptions.none)) {
134134
LOGGER.debug("User already has a '"
135135
+ possibleExistingSubscription.toString()
@@ -146,15 +146,24 @@ public void process(Element elm, JID actorJID, IQ reqIQ, Element rsm)
146146
}
147147
defaultSubscription = Subscriptions.subscribed;
148148
String accessModel = nodeConf.get(Conf.ACCESS_MODEL);
149-
if ((null == accessModel)
150-
|| (accessModel.equals(AccessModels.authorize.toString()))) {
149+
150+
if ((null == accessModel)
151+
|| (true == accessModel.equals(AccessModels.authorize
152+
.toString()))
153+
|| (true == accessModel.equals(AccessModels.whitelist
154+
.toString()))) {
155+
defaultSubscription = Subscriptions.pending;
156+
} else if ((true == accessModel.equals(AccessModels.local
157+
.toString()) && (false == channelManager
158+
.isLocalJID(subscribingJid)))) {
151159
defaultSubscription = Subscriptions.pending;
152160
}
153-
154-
NodeSubscription newSubscription = new NodeSubscriptionImpl(node,
155-
subscribingJid, request.getFrom(), defaultSubscription);
161+
162+
NodeSubscription newSubscription = new NodeSubscriptionImpl(
163+
node, subscribingJid, request.getFrom(),
164+
defaultSubscription);
156165
channelManager.addUserSubscription(newSubscription);
157-
166+
158167
if (null != possibleExistingAffiliation) {
159168
defaultAffiliation = possibleExistingAffiliation;
160169
}
@@ -176,29 +185,33 @@ public void process(Element elm, JID actorJID, IQ reqIQ, Element rsm)
176185

177186
outQueue.put(reply);
178187

179-
notifySubscribers(defaultSubscription, defaultAffiliation, subscribingJid);
188+
notifySubscribers(defaultSubscription, defaultAffiliation,
189+
subscribingJid);
180190

181-
t.commit();
182-
} finally {
183-
if (t != null) {
184-
t.close();
185-
}
191+
} catch (NodeStoreException e) {
192+
IQ reply = IQ.createResultIQ(request);
193+
reply.setType(Type.error);
194+
PacketError pe = new PacketError(
195+
PacketError.Condition.internal_server_error,
196+
PacketError.Type.wait);
197+
reply.setError(pe);
198+
outQueue.put(reply);
186199
}
187200
}
188201

189-
private boolean handleNodeSubscription(Element elm, JID actorJID, JID subscribingJid)
190-
throws NodeStoreException, InterruptedException {
202+
private boolean handleNodeSubscription(Element elm, JID actorJID,
203+
JID subscribingJid) throws NodeStoreException, InterruptedException {
191204
if ((!channelManager.isLocalNode(node)) && (!node.equals("/firehose"))) {
192-
makeRemoteRequest();
193-
return false;
194-
}
205+
makeRemoteRequest();
206+
return false;
207+
}
195208

196209
// 6.1.3.1 JIDs Do Not Match
197210

198211
// Covers where we have juliet@shakespeare.lit/the-balcony
199-
String[] jidParts = elm.attributeValue("jid").split("/");
200-
String jid = jidParts[0];
201-
if (!subscribingJid.toBareJID().equals(jid)) {
212+
JID jid = new JID(request.getChildElement().element("subscribe")
213+
.attributeValue("jid"));
214+
if (!subscribingJid.toBareJID().equals(jid.toBareJID())) {
202215

203216
/*
204217
* // 6.1.3.1 JIDs Do Not Match <iq type='error'
@@ -213,7 +226,7 @@ private boolean handleNodeSubscription(Element elm, JID actorJID, JID subscribin
213226

214227
Element badRequest = new DOMElement("bad-request",
215228
new org.dom4j.Namespace("", JabberPubsub.NS_XMPP_STANZAS));
216-
Element nodeIdRequired = new DOMElement("invalid-jid",
229+
Element nodeIdRequired = new DOMElement(INVALID_JID,
217230
new org.dom4j.Namespace("", JabberPubsub.NS_PUBSUB_ERROR));
218231
Element error = new DOMElement("error");
219232
error.addAttribute("type", PacketError.Type.modify.toXMPP());
@@ -239,16 +252,15 @@ private boolean handleNodeSubscription(Element elm, JID actorJID, JID subscribin
239252

240253
private void makeRemoteRequest() throws InterruptedException {
241254
request.setTo(new JID(node.split("/")[2]).getDomain());
242-
Element actor = request.getElement()
243-
.element("pubsub")
244-
.addElement("actor", JabberPubsub.NS_BUDDYCLOUD);
255+
Element actor = request.getElement().element("pubsub")
256+
.addElement("actor", JabberPubsub.NS_BUDDYCLOUD);
245257
actor.addText(request.getFrom().toBareJID());
246-
outQueue.put(request);
258+
outQueue.put(request);
247259
}
248260

249261
private void notifySubscribers(Subscriptions subscriptionStatus,
250-
Affiliations affiliationType, JID subscribingJid) throws NodeStoreException,
251-
InterruptedException {
262+
Affiliations affiliationType, JID subscribingJid)
263+
throws NodeStoreException, InterruptedException {
252264

253265
ResultSet<NodeSubscription> subscribers = channelManager
254266
.getNodeSubscriptionListeners(node);
@@ -284,9 +296,9 @@ private void notifySubscribers(Subscriptions subscriptionStatus,
284296
affiliation.addAttribute("affiliation", affiliationType.toString());
285297

286298
Message rootElement = new Message(message);
287-
299+
288300
for (NodeSubscription subscriber : subscribers) {
289-
301+
290302
Message notification = rootElement.createCopy();
291303
notification.setTo(subscriber.getListener());
292304
outQueue.put(notification);
@@ -304,13 +316,14 @@ private void notifySubscribers(Subscriptions subscriptionStatus,
304316
}
305317
}
306318

307-
private Message getPendingSubscriptionNotification(String receiver, String subscriber) {
319+
private Message getPendingSubscriptionNotification(String receiver,
320+
String subscriber) {
308321

309322
Document document = getDocumentHelper();
310323
Element message = document.addElement("message");
311324
message.addAttribute("from", request.getTo().toString());
312325
message.addAttribute("type", "headline");
313-
message.addAttribute("to", receiver);
326+
message.addAttribute("to", receiver);
314327
DataForm dataForm = new DataForm(DataForm.Type.form);
315328
dataForm.addInstruction("Allow " + subscriber
316329
+ " to subscribe to node " + node + "?");
@@ -330,7 +343,8 @@ private Message getPendingSubscriptionNotification(String receiver, String subsc
330343
jid.setLabel("Subscriber Address");
331344
jid.setVariable(JabberPubsub.VAR_SUBSCRIBER_JID);
332345
FormField allow = dataForm.addField();
333-
allow.setLabel("Allow " + subscriber + " to subscribe to posts of " + node + "?");
346+
allow.setLabel("Allow " + subscriber + " to subscribe to posts of "
347+
+ node + "?");
334348
allow.setVariable(JabberPubsub.VAR_ALLOW);
335349
allow.addValue("false");
336350
allow.setType(FormField.Type.boolean_type);
@@ -374,7 +388,7 @@ private void missingNodeName() throws InterruptedException {
374388
Element badRequest = new DOMElement("bad-request",
375389
new org.dom4j.Namespace("", JabberPubsub.NS_XMPP_STANZAS));
376390

377-
Element nodeIdRequired = new DOMElement("nodeid-required",
391+
Element nodeIdRequired = new DOMElement(MISSING_NODE_ID,
378392
new org.dom4j.Namespace("", JabberPubsub.NS_PUBSUB_ERROR));
379393

380394
Element error = new DOMElement("error");

0 commit comments

Comments
 (0)