Skip to content

Commit f80757b

Browse files
committed
Handlers for retrieving node threads.
1 parent 0997118 commit f80757b

8 files changed

Lines changed: 362 additions & 1 deletion

File tree

src/main/java/org/buddycloud/channelserver/channel/ChannelManagerImpl.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.buddycloud.channelserver.pubsub.model.NodeAffiliation;
1717
import org.buddycloud.channelserver.pubsub.model.NodeItem;
1818
import org.buddycloud.channelserver.pubsub.model.NodeSubscription;
19+
import org.buddycloud.channelserver.pubsub.model.NodeThread;
1920
import org.xmpp.packet.JID;
2021
import org.xmpp.resultsetmanagement.ResultSet;
2122

@@ -457,5 +458,16 @@ public void deleteUserAffiliations(JID userJid) throws NodeStoreException {
457458
public void deleteUserSubscriptions(JID userJid) throws NodeStoreException {
458459
nodeStore.deleteUserSubscriptions(userJid);
459460
}
461+
462+
@Override
463+
public ResultSet<NodeThread> getNodeThreads(String node, String afterId,
464+
int limit) throws NodeStoreException {
465+
return nodeStore.getNodeThreads(node, afterId, limit);
466+
}
467+
468+
@Override
469+
public int countNodeThreads(String node) throws NodeStoreException {
470+
return nodeStore.countNodeThreads(node);
471+
}
460472

461473
}

src/main/java/org/buddycloud/channelserver/db/NodeStore.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.buddycloud.channelserver.pubsub.model.NodeAffiliation;
1212
import org.buddycloud.channelserver.pubsub.model.NodeItem;
1313
import org.buddycloud.channelserver.pubsub.model.NodeSubscription;
14+
import org.buddycloud.channelserver.pubsub.model.NodeThread;
1415
import org.xmpp.packet.JID;
1516
import org.xmpp.resultsetmanagement.ResultSet;
1617

@@ -580,6 +581,10 @@ void deleteNodeItemById(String nodeId, String nodeItemId)
580581

581582
void deleteUserSubscriptions(JID userJid) throws NodeStoreException;
582583

584+
ResultSet<NodeThread> getNodeThreads(String node, String afterId, int limit) throws NodeStoreException;
585+
586+
int countNodeThreads(String node) throws NodeStoreException;
587+
583588
/**
584589
* Closes this node store instance and releases any resources.
585590
*/

src/main/java/org/buddycloud/channelserver/db/jdbc/JDBCNodeStore.java

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,11 @@
2828
import org.buddycloud.channelserver.pubsub.model.NodeAffiliation;
2929
import org.buddycloud.channelserver.pubsub.model.NodeItem;
3030
import org.buddycloud.channelserver.pubsub.model.NodeSubscription;
31+
import org.buddycloud.channelserver.pubsub.model.NodeThread;
3132
import org.buddycloud.channelserver.pubsub.model.impl.NodeAffiliationImpl;
3233
import org.buddycloud.channelserver.pubsub.model.impl.NodeItemImpl;
3334
import org.buddycloud.channelserver.pubsub.model.impl.NodeSubscriptionImpl;
35+
import org.buddycloud.channelserver.pubsub.model.impl.NodeThreadImpl;
3436
import org.buddycloud.channelserver.pubsub.subscription.Subscriptions;
3537
import org.xmpp.packet.JID;
3638
import org.xmpp.resultsetmanagement.ResultSet;
@@ -1651,6 +1653,70 @@ public void deleteUserSubscriptions(JID userJid) throws NodeStoreException {
16511653
}
16521654
}
16531655

1656+
@Override
1657+
public ResultSet<NodeThread> getNodeThreads(String node, String afterId,
1658+
int limit) throws NodeStoreException {
1659+
1660+
Date after = new Date();
1661+
if (afterId != null) {
1662+
NodeItem afterItem = getNodeItem(node, afterId);
1663+
if (afterItem != null) {
1664+
after = afterItem.getUpdated();
1665+
}
1666+
}
1667+
1668+
PreparedStatement stmt = null;
1669+
try {
1670+
stmt = conn.prepareStatement(dialect.selectNodeThreads());
1671+
stmt.setString(1, node);
1672+
stmt.setTimestamp(2, new java.sql.Timestamp(after.getTime()));
1673+
stmt.setInt(3, limit);
1674+
1675+
java.sql.ResultSet rs = stmt.executeQuery();
1676+
ArrayList<NodeThread> nodeThreads = new ArrayList<NodeThread>();
1677+
1678+
NodeThreadImpl currentThread = null;
1679+
while (rs.next()) {
1680+
NodeItem nodeItem = new NodeItemImpl(rs.getString(1),
1681+
rs.getString(2), rs.getTimestamp(3),
1682+
rs.getString(4), rs.getString(5));
1683+
String threadId = rs.getString(6);
1684+
Date threadUpdated = rs.getTimestamp(7);
1685+
if (currentThread == null || !threadId.equals(currentThread.getId())) {
1686+
NodeThreadImpl newThread = new NodeThreadImpl(threadId, threadUpdated);
1687+
nodeThreads.add(newThread);
1688+
currentThread = newThread;
1689+
}
1690+
currentThread.addItem(nodeItem);
1691+
}
1692+
return new ResultSetImpl<NodeThread>(nodeThreads);
1693+
} catch (SQLException e) {
1694+
throw new NodeStoreException(e);
1695+
} finally {
1696+
close(stmt); // Will implicitly close the resultset if required
1697+
}
1698+
}
1699+
1700+
@Override
1701+
public int countNodeThreads(String node) throws NodeStoreException {
1702+
PreparedStatement selectStatement = null;
1703+
try {
1704+
selectStatement = conn
1705+
.prepareStatement(dialect.countNodeThreads());
1706+
selectStatement.setString(1, node);
1707+
java.sql.ResultSet rs = selectStatement.executeQuery();
1708+
if (rs.next()) {
1709+
return rs.getInt(1);
1710+
} else {
1711+
return 0; // This really shouldn't happen!
1712+
}
1713+
} catch (SQLException e) {
1714+
throw new NodeStoreException(e);
1715+
} finally {
1716+
close(selectStatement);
1717+
}
1718+
}
1719+
16541720
@Override
16551721
public Transaction beginTransaction() throws NodeStoreException {
16561722
if (transactionHasBeenRolledBack) {
@@ -1881,7 +1947,10 @@ public interface NodeStoreSQLDialect {
18811947
String deleteUserAffiliations();
18821948

18831949
String deleteUserSubscriptions();
1950+
1951+
String selectNodeThreads();
1952+
1953+
String countNodeThreads();
18841954

18851955
}
1886-
18871956
}

src/main/java/org/buddycloud/channelserver/db/jdbc/dialect/Sql92NodeStoreDialect.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,22 @@ public class Sql92NodeStoreDialect implements NodeStoreSQLDialect {
221221
private static final String DELETE_USER_AFFILIATIONS = "DELETE FROM \"affiliations\" WHERE \"user\" = ?";
222222

223223
private static final String DELETE_USER_SUBSCRIPTIONS = "DELETE FROM \"subscriptions\" WHERE \"user\" = ?";
224+
225+
private static final String SELECT_NODE_THREADS =
226+
"SELECT \"items.node\", \"items.id\", \"items.updated\", \"items.xml\", \"items.in_reply_to\", " +
227+
"\"threads.thread_id\", \"threads.thread_updated\" FROM items," +
228+
"(SELECT MAX(updated) AS thread_updated, thread_id FROM " +
229+
"(SELECT node, updated, (CASE WHEN (in_reply_to IS NULL) THEN id ELSE in_reply_to END) AS thread_id FROM items) AS _items" +
230+
"WHERE node = ? " +
231+
"GROUP BY thread_id " +
232+
"HAVING MAX(updated) < ? " +
233+
"ORDER BY thread_updated DESC LIMIT ?) AS threads " +
234+
"WHERE items.in_reply_to = threads.thread_id OR items.id = threads.thread_id " +
235+
"ORDER BY threads.thread_updated DESC, items.updated;";
236+
237+
private static final String COUNT_NODE_THREADS = "SELECT COUNT(DISTINCT _items.thread_id) " +
238+
"FROM (SELECT node, (CASE WHEN (in_reply_to IS NULL) THEN id ELSE in_reply_to END) AS thread_id " +
239+
"FROM items WHERE node = ?) AS _items;";
224240

225241
@Override
226242
public String insertNode() {
@@ -495,4 +511,14 @@ public String deleteUserAffiliations() {
495511
public String deleteUserSubscriptions() {
496512
return DELETE_USER_SUBSCRIPTIONS;
497513
}
514+
515+
@Override
516+
public String selectNodeThreads() {
517+
return SELECT_NODE_THREADS;
518+
}
519+
520+
@Override
521+
public String countNodeThreads() {
522+
return COUNT_NODE_THREADS;
523+
}
498524
}

src/main/java/org/buddycloud/channelserver/packetprocessor/iq/namespace/pubsub/PubSubGet.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.buddycloud.channelserver.packetprocessor.PacketProcessor;
99
import org.buddycloud.channelserver.packetprocessor.iq.namespace.pubsub.get.AffiliationsGet;
1010
import org.buddycloud.channelserver.packetprocessor.iq.namespace.pubsub.get.NodeConfigureGet;
11+
import org.buddycloud.channelserver.packetprocessor.iq.namespace.pubsub.get.NodeThreadsGet;
1112
import org.buddycloud.channelserver.packetprocessor.iq.namespace.pubsub.get.RecentItemsGet;
1213
import org.buddycloud.channelserver.packetprocessor.iq.namespace.pubsub.get.RepliesGet;
1314
import org.buddycloud.channelserver.packetprocessor.iq.namespace.pubsub.get.SubscriptionsGet;
@@ -42,6 +43,7 @@ private void initElementProcessors() {
4243
elementProcessors.add(new ThreadGet(outQueue, channelManager));
4344
elementProcessors.add(new RecentItemsGet(outQueue, channelManager));
4445
elementProcessors.add(new NodeConfigureGet(outQueue, channelManager));
46+
elementProcessors.add(new NodeThreadsGet(outQueue, channelManager));
4547
elementProcessors.add(new RepliesGet(outQueue, channelManager));
4648
}
4749

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
package org.buddycloud.channelserver.packetprocessor.iq.namespace.pubsub.get;
2+
3+
import java.io.StringReader;
4+
import java.util.Map;
5+
import java.util.concurrent.BlockingQueue;
6+
7+
import org.buddycloud.channelserver.channel.ChannelManager;
8+
import org.buddycloud.channelserver.channel.Conf;
9+
import org.buddycloud.channelserver.channel.node.configuration.field.AccessModel;
10+
import org.buddycloud.channelserver.db.exception.NodeStoreException;
11+
import org.buddycloud.channelserver.packetprocessor.iq.namespace.pubsub.JabberPubsub;
12+
import org.buddycloud.channelserver.packetprocessor.iq.namespace.pubsub.PubSubElementProcessorAbstract;
13+
import org.buddycloud.channelserver.pubsub.accessmodel.AccessModels;
14+
import org.buddycloud.channelserver.pubsub.affiliation.Affiliations;
15+
import org.buddycloud.channelserver.pubsub.model.NodeAffiliation;
16+
import org.buddycloud.channelserver.pubsub.model.NodeItem;
17+
import org.buddycloud.channelserver.pubsub.model.NodeSubscription;
18+
import org.buddycloud.channelserver.pubsub.model.NodeThread;
19+
import org.buddycloud.channelserver.pubsub.subscription.Subscriptions;
20+
import org.buddycloud.channelserver.utils.node.NodeAclRefuseReason;
21+
import org.buddycloud.channelserver.utils.node.NodeViewAcl;
22+
import org.dom4j.DocumentException;
23+
import org.dom4j.Element;
24+
import org.dom4j.io.SAXReader;
25+
import org.xmpp.packet.IQ;
26+
import org.xmpp.packet.JID;
27+
import org.xmpp.packet.Packet;
28+
import org.xmpp.packet.PacketError;
29+
import org.xmpp.resultsetmanagement.ResultSet;
30+
31+
public class NodeThreadsGet extends PubSubElementProcessorAbstract {
32+
33+
private static final int MAX_THREADS_TO_RETURN = 15;
34+
private int max;
35+
private String afterId;
36+
37+
public NodeThreadsGet(BlockingQueue<Packet> outQueue,
38+
ChannelManager channelManager) {
39+
setChannelManager(channelManager);
40+
setOutQueue(outQueue);
41+
}
42+
43+
@Override
44+
public void process(Element elm, JID actorJID, IQ reqIQ, Element rsm)
45+
throws Exception {
46+
this.request = reqIQ;
47+
this.node = elm.attributeValue("node");
48+
this.actor = actorJID;
49+
this.resultSetManagement = rsm;
50+
this.max = MAX_THREADS_TO_RETURN;
51+
52+
if (actor == null) {
53+
actor = request.getFrom();
54+
}
55+
if (!isValidStanza()) {
56+
outQueue.put(response);
57+
return;
58+
}
59+
if (!userCanViewNode()) {
60+
outQueue.put(response);
61+
return;
62+
}
63+
if (!parseRsmElement()) {
64+
outQueue.put(response);
65+
}
66+
getNodeThreads();
67+
addRsmElement();
68+
outQueue.put(response);
69+
}
70+
71+
private void addRsmElement() throws NodeStoreException {
72+
if (firstItem == null) {
73+
return;
74+
}
75+
Element pubsubEl = response.getElement().addElement("pubsub",
76+
JabberPubsub.NAMESPACE_URI);
77+
Element rsm = pubsubEl.addElement("set", NS_RSM);
78+
rsm.addElement("first", NS_RSM).setText(firstItem);
79+
rsm.addElement("last", NS_RSM).setText(lastItem);
80+
81+
Integer nodeThreadCount = channelManager.countNodeThreads(node);
82+
rsm.addElement("count", NS_RSM).setText(nodeThreadCount.toString());
83+
}
84+
85+
private void getNodeThreads() throws NodeStoreException, DocumentException {
86+
ResultSet<NodeThread> nodeThreads = channelManager.getNodeThreads(node, afterId, max);
87+
this.response = IQ.createResultIQ(request);
88+
Element pubsubEl = response.getElement().addElement("pubsub",
89+
JabberPubsub.NAMESPACE_URI);
90+
SAXReader xmlReader = new SAXReader();
91+
for (NodeThread nodeThread : nodeThreads) {
92+
Element threadEl = pubsubEl.addElement("thread");
93+
threadEl.addAttribute("node", node);
94+
threadEl.addAttribute("id", nodeThread.getId());
95+
threadEl.addAttribute("updated", Conf.formatDate(
96+
nodeThread.getUpdated()));
97+
ResultSet<NodeItem> items = nodeThread.getItems();
98+
for (NodeItem item : items) {
99+
Element entry = xmlReader.read(
100+
new StringReader(item.getPayload())).getRootElement();
101+
Element itemElement = threadEl.addElement("item");
102+
itemElement.addAttribute("id", item.getId());
103+
itemElement.add(entry);
104+
}
105+
}
106+
if (!nodeThreads.isEmpty()) {
107+
this.firstItem = nodeThreads.getFirst(1).iterator().next().getId();
108+
this.lastItem = nodeThreads.getLast(1).iterator().next().getId();
109+
}
110+
}
111+
112+
private boolean isValidStanza() throws NodeStoreException {
113+
if (node == null) {
114+
createExtendedErrorReply(PacketError.Type.modify,
115+
PacketError.Condition.bad_request, "nodeid-required");
116+
return false;
117+
}
118+
if (!channelManager.nodeExists(node)) {
119+
setErrorCondition(PacketError.Type.cancel,
120+
PacketError.Condition.item_not_found);
121+
return false;
122+
}
123+
return true;
124+
}
125+
126+
private AccessModels getNodeAccessModel(Map<String, String> nodeConfiguration) {
127+
if (!nodeConfiguration.containsKey(AccessModel.FIELD_NAME)) {
128+
return AccessModels.authorize;
129+
}
130+
return AccessModels.createFromString(nodeConfiguration
131+
.get(AccessModel.FIELD_NAME));
132+
}
133+
134+
private boolean userCanViewNode() throws NodeStoreException {
135+
NodeSubscription nodeSubscription = channelManager.getUserSubscription(
136+
node, actor);
137+
NodeAffiliation nodeAffiliation = channelManager.getUserAffiliation(
138+
node, actor);
139+
140+
Affiliations affiliation = Affiliations.none;
141+
Subscriptions subscription = Subscriptions.none;
142+
if (nodeSubscription != null) {
143+
affiliation = nodeAffiliation.getAffiliation();
144+
subscription = nodeSubscription.getSubscription();
145+
}
146+
NodeViewAcl nodeViewAcl = new NodeViewAcl();
147+
Map<String, String> nodeConfiguration = channelManager.getNodeConf(node);
148+
149+
if (nodeViewAcl.canViewNode(node, affiliation, subscription,
150+
getNodeAccessModel(nodeConfiguration))) {
151+
return true;
152+
}
153+
154+
NodeAclRefuseReason reason = nodeViewAcl.getReason();
155+
createExtendedErrorReply(reason.getType(), reason.getCondition(),
156+
reason.getAdditionalErrorElement());
157+
return false;
158+
}
159+
160+
private boolean parseRsmElement() throws NodeStoreException {
161+
if (null == resultSetManagement) {
162+
return true;
163+
}
164+
Element maxEl = resultSetManagement.element("max");
165+
if (maxEl != null) {
166+
this.max = Integer.parseInt(maxEl.getTextTrim());
167+
}
168+
Element afterEl = resultSetManagement.element("after");
169+
if (afterEl != null) {
170+
this.afterId = afterEl.getTextTrim();
171+
if (channelManager.getNodeItem(node, afterId) == null) {
172+
setErrorCondition(PacketError.Type.cancel,
173+
PacketError.Condition.item_not_found);
174+
return false;
175+
}
176+
}
177+
return true;
178+
}
179+
180+
@Override
181+
public boolean accept(Element elm) {
182+
return elm.getName().equals("threads");
183+
}
184+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package org.buddycloud.channelserver.pubsub.model;
2+
3+
import java.util.Date;
4+
5+
import org.xmpp.resultsetmanagement.Result;
6+
import org.xmpp.resultsetmanagement.ResultSet;
7+
8+
public interface NodeThread extends Result {
9+
10+
String getId();
11+
12+
Date getUpdated();
13+
14+
ResultSet<NodeItem> getItems();
15+
16+
}

0 commit comments

Comments
 (0)