-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathChordNameServiceImpl.java
More file actions
160 lines (132 loc) · 4.8 KB
/
ChordNameServiceImpl.java
File metadata and controls
160 lines (132 loc) · 4.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.*;
public class ChordNameServiceImpl {
private DistributedTextEditor dte;
private int port;
protected InetSocketAddress myName;
protected int myKey;
private InetSocketAddress suc;
private InetSocketAddress pre;
private InetSocketAddress connectedAt;
private ServerSocket serverSocket;
private Socket preSocket, sucSocket;
private DisconnectThread disconnectThread;
private Socket hack;
public Socket getSucSocket() {
return sucSocket;
}
public void setSucSocket(Socket sucSocket) {
this.sucSocket = sucSocket;
}
public Socket getPreSocket() {
return preSocket;
}
public void setPreSocket(Socket preSocket) {
this.preSocket = preSocket;
}
private boolean active;
private boolean first;
private ServerThread serverThread;
public ChordNameServiceImpl(InetSocketAddress myName, DistributedTextEditor dte){
this.myName = myName;
this.port = myName.getPort();
this.dte = dte;
}
public int keyOfName(InetSocketAddress name) {
int tmp = name.hashCode()*1073741651 % 2147483647;
if (tmp < 0) { tmp = -tmp; }
return tmp;
}
public InetSocketAddress getChordName() {
return myName;
}
public void createGroup(){
serverThread = new ServerThread(dte,this);
new Thread(serverThread).start();
}
public void joinGroup(InetSocketAddress knownPeer) {
active = true;
connectedAt = knownPeer;
try {
// Setup successor
sucSocket = new Socket(knownPeer.getAddress(),port);
dte.newEventPlayer(sucSocket, myKey);
// Wait for new predecessor
ServerSocket server = new ServerSocket(port);
System.out.println("waiting");
final ServerSocket finalServer = server;
new Thread(new Runnable(){
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println("about to close");
finalServer.close();
System.out.println("closed");
} catch (InterruptedException | IOException e) {
e.printStackTrace();
}
}
}).start();
preSocket = server.accept();
System.out.println("Done waiting");
System.out.println("else");
// Start listening for disconnects from successor
disconnectThread = new DisconnectThread(dte, this, sucSocket);
new Thread(disconnectThread).start();
dte.newEventReplayer(preSocket, myKey);
// Keep listening for new joins
serverThread = new ServerThread(dte, this, server);
new Thread(serverThread).start();
}
catch (SocketException e1){
first = true;
System.out.println("first");
preSocket = sucSocket;
dte.newEventReplayer(preSocket, myKey);
try {
hack = new Socket(preSocket.getInetAddress(), port+1);
// Start listening for disconnects from successor
disconnectThread = new DisconnectThread(dte,this,hack);
new Thread(disconnectThread).start();
// Keep listening for new joins
ServerSocket server = new ServerSocket(port);
serverThread = new ServerThread(dte,this,server);
new Thread(serverThread).start();
} catch (IOException e) {
e.printStackTrace();
}
}
catch (IOException e) {
e.printStackTrace();
}
}
public void notFirst(){
first = false;
}
public void leaveGroup() {
try {
ObjectOutputStream disconnectStream = null;
if (first) {
disconnectStream = new ObjectOutputStream(hack.getOutputStream());
} else
disconnectStream = new ObjectOutputStream(preSocket.getOutputStream());
disconnectStream.writeObject(new DisconnectEvent(sucSocket.getInetAddress()));
preSocket.close();
sucSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
/*
* If joining we should now enter the existing group and
* should at some point register this peer on its port if not
* already done and start listening for incoming connection
* from other peers who want to enter or leave the
* group. After leaveGroup() was called, the run() method
* should return so that the thread running it might
* terminate.
*/
}