Skip to content

Commit f529d8b

Browse files
author
jython234
committed
Work some more on client part.
1 parent 5747968 commit f529d8b

3 files changed

Lines changed: 393 additions & 0 deletions

File tree

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package net.beaconpe.jraklib.client;
2+
3+
import net.beaconpe.jraklib.protocol.EncapsulatedPacket;
4+
5+
/**
6+
* Represents a Connection to a server.
7+
*
8+
* @author jython234
9+
*/
10+
public class Connection {
11+
12+
protected boolean connected;
13+
14+
protected ConnectionManager manager;
15+
16+
public Connection(ConnectionManager manager){
17+
18+
}
19+
20+
public void update(long time){
21+
22+
}
23+
24+
public void handlePacket(byte[] data) {
25+
26+
}
27+
28+
public void addEncapsulatedToQueue(EncapsulatedPacket packet, byte flags) {
29+
30+
}
31+
32+
protected void onShutdown() {
33+
34+
}
35+
}
Lines changed: 327 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,327 @@
1+
package net.beaconpe.jraklib.client;
2+
3+
import static net.beaconpe.jraklib.JRakLib.getAddressFromString;
4+
import static net.beaconpe.jraklib.JRakLib.sleepUntil;
5+
import static net.beaconpe.jraklib.protocol.DataPackets.*;
6+
7+
import net.beaconpe.jraklib.Binary;
8+
import net.beaconpe.jraklib.JRakLib;
9+
import net.beaconpe.jraklib.Logger;
10+
import net.beaconpe.jraklib.protocol.*;
11+
12+
import java.io.IOException;
13+
import java.net.DatagramPacket;
14+
import java.net.InetSocketAddress;
15+
import java.net.SocketAddress;
16+
import java.nio.ByteBuffer;
17+
import java.time.Instant;
18+
import java.util.Arrays;
19+
import java.util.Map;
20+
import java.util.concurrent.ConcurrentHashMap;
21+
import java.util.regex.Pattern;
22+
23+
/**
24+
* Manager for handling a connection. The manager handles the internal ticking, and networking.
25+
*
26+
* @author jython234
27+
*/
28+
public class ConnectionManager {
29+
protected Map<Byte, Class<? extends Packet>> packetPool = new ConcurrentHashMap<>();
30+
31+
protected JRakLibClient client;
32+
33+
protected UDPClientSocket socket;
34+
35+
protected int receiveBytes = 0;
36+
protected int sendBytes = 0;
37+
38+
protected boolean shutdown = false;
39+
40+
protected int ticks = 0;
41+
protected long lastMeasure;
42+
43+
protected Connection connection;
44+
45+
public boolean portChecking = false;
46+
47+
public ConnectionManager(JRakLibClient client, UDPClientSocket socket){
48+
this.client = client;
49+
this.socket = socket;
50+
registerPackets();
51+
52+
connection = new Connection();
53+
try {
54+
if(connect(1447, 4)){
55+
connection.connected = true;
56+
} else {
57+
getLogger().notice("Failed to connect to "+client.getServerEndpoint()+": no response.");
58+
client.pushMainToThreadPacket(new byte[] {JRakLib.PACKET_SHUTDOWN});
59+
}
60+
} catch (IOException e) {
61+
getLogger().emergency("*** FAILED TO CONNECT TO "+client.getServerEndpoint()+": IOException: "+e.getMessage());
62+
e.printStackTrace();
63+
client.pushMainToThreadPacket(new byte[] {JRakLib.PACKET_EMERGENCY_SHUTDOWN});
64+
}
65+
}
66+
67+
private boolean connect(int payloadSize, int packets) throws IOException {
68+
for(int i = 0; i < packets; i++){
69+
OPEN_CONNECTION_REQUEST_1 request1 = new OPEN_CONNECTION_REQUEST_1();
70+
request1.protocol = JRakLib.PROTOCOL;
71+
request1.mtuSize = (short) payloadSize;
72+
request1.encode();
73+
socket.writePacket(request1.buffer, client.getServerEndpoint());
74+
75+
DatagramPacket response = socket.readPacketBlocking(500);
76+
if(response != null && response.getData()[0] == OPEN_CONNECTION_REPLY_1.ID){
77+
connection.handlePacket(response.getData());
78+
return true;
79+
}
80+
}
81+
if(payloadSize == 1447){
82+
return connect(1155, 4);
83+
} else if(payloadSize == 1155){
84+
return connect(531, 5);
85+
} else {
86+
return false;
87+
}
88+
}
89+
90+
public Logger getLogger(){
91+
return client.getLogger();
92+
}
93+
94+
public void run(){
95+
try {
96+
tickProcessor();
97+
} catch (Exception e) {
98+
throw new RuntimeException(e);
99+
}
100+
}
101+
102+
private void tickProcessor() throws IOException {
103+
lastMeasure = Instant.now().toEpochMilli();
104+
105+
while(!shutdown){
106+
long start = Instant.now().toEpochMilli();
107+
int max = 5000;
108+
while(receivePacket()){
109+
max = max - 1;
110+
}
111+
while(receiveStream());
112+
long time = Instant.now().toEpochMilli() - start;
113+
if(time < 50){ //20 ticks per second (1000 / 20)
114+
sleepUntil(Instant.now().toEpochMilli()+(50 - time));
115+
}
116+
tick();
117+
}
118+
}
119+
120+
private void tick() throws IOException {
121+
long time = Instant.now().toEpochMilli();
122+
connection.update(time);
123+
124+
if((ticks & 0b1111) == 0){
125+
double diff = Math.max(0.005d, time - lastMeasure);
126+
streamOption("bandwith", "up:"+(sendBytes / diff)+",down:"+(receiveBytes / diff)); //TODO: Fix this stuff
127+
lastMeasure = time;
128+
sendBytes = 0;
129+
receiveBytes = 0;
130+
}
131+
ticks = ticks + 1;
132+
}
133+
134+
private boolean receivePacket() throws IOException {
135+
DatagramPacket packet = socket.readPacket();
136+
if(packet == null) {
137+
return false;
138+
}
139+
int len = packet.getLength();
140+
if(len > 0){
141+
SocketAddress source = packet.getSocketAddress();
142+
receiveBytes += len;
143+
144+
Packet pkt = getPacketFromPool(packet.getData()[0]);
145+
if(pkt != null){
146+
pkt.buffer = packet.getData();
147+
//getSession(getAddressFromString(source.toString()), packet.getPort()).handlePacket(pkt);
148+
return true;
149+
} else if (packet.getData() != null){
150+
streamRaw(source, packet.getData());
151+
return true;
152+
} else {
153+
getLogger().notice("Dropped packet: "+ Arrays.toString(packet.getData()));
154+
return false;
155+
}
156+
}
157+
return false;
158+
}
159+
160+
public void streamEncapsulated(EncapsulatedPacket packet){
161+
streamEncapsulated(packet, JRakLib.PRIORITY_NORMAL);
162+
}
163+
164+
public void streamEncapsulated(EncapsulatedPacket packet, byte flags){
165+
String id = client.getServerIP() + ":" + client.getServerPort();
166+
ByteBuffer bb = ByteBuffer.allocate(3+id.getBytes().length+packet.getTotalLength(true));
167+
bb.put(JRakLib.PACKET_ENCAPSULATED).put((byte) id.getBytes().length).put(id.getBytes()).put(flags).put(packet.toBinary(true));
168+
client.pushThreadToMainPacket(bb.array());
169+
}
170+
171+
public void streamRaw(SocketAddress address, byte[] payload){
172+
String dest;
173+
int port;
174+
if(address.toString().contains("/")) {
175+
dest = address.toString().split(Pattern.quote("/"))[1].split(Pattern.quote(":"))[0];
176+
port = Integer.parseInt(address.toString().split(Pattern.quote("/"))[1].split(Pattern.quote(":"))[1]);
177+
} else {
178+
dest = address.toString().split(Pattern.quote(":"))[0];
179+
port = Integer.parseInt(address.toString().split(Pattern.quote(":"))[1]);
180+
}
181+
streamRaw(dest, port, payload);
182+
}
183+
184+
public void streamRaw(String address, int port, byte[] payload){
185+
ByteBuffer bb = ByteBuffer.allocate(4 + address.getBytes().length + payload.length);
186+
bb.put(JRakLib.PACKET_RAW).put((byte) address.getBytes().length).put(address.getBytes()).put(Binary.writeShort((short) port)).put(payload);
187+
client.pushThreadToMainPacket(bb.array());
188+
}
189+
190+
protected void streamClose(String identifier, String reason){
191+
ByteBuffer bb = ByteBuffer.allocate(3 + identifier.getBytes().length + reason.getBytes().length);
192+
bb.put(JRakLib.PACKET_CLOSE_SESSION).put((byte) identifier.getBytes().length).put(identifier.getBytes()).put((byte) reason.getBytes().length).put(reason.getBytes());
193+
client.pushThreadToMainPacket(bb.array());
194+
}
195+
196+
protected void streamInvalid(String identifier){
197+
ByteBuffer bb = ByteBuffer.allocate(2+identifier.getBytes().length);
198+
bb.put(JRakLib.PACKET_INVALID_SESSION).put((byte) identifier.getBytes().length).put(identifier.getBytes());
199+
client.pushThreadToMainPacket(bb.array());
200+
}
201+
202+
protected void streamOpen(long serverId){
203+
String identifier = client.getServerIP() + ":" + client.getServerPort();
204+
ByteBuffer bb = ByteBuffer.allocate(9 + identifier.getBytes().length);
205+
bb.put(JRakLib.PACKET_OPEN_SESSION).put((byte) identifier.getBytes().length).put(identifier.getBytes()).put(Binary.writeLong(serverId));
206+
client.pushThreadToMainPacket(bb.array());
207+
}
208+
209+
protected void streamACK(String identifier, int identifierACK){
210+
ByteBuffer bb = ByteBuffer.allocate(6+identifier.getBytes().length);
211+
bb.put(JRakLib.PACKET_ACK_NOTIFICATION).put((byte) identifier.getBytes().length).put(identifier.getBytes()).put(Binary.writeInt(identifierACK));
212+
client.pushThreadToMainPacket(bb.array());
213+
}
214+
215+
protected void streamOption(String name, String value){
216+
ByteBuffer bb = ByteBuffer.allocate(2+name.getBytes().length+value.getBytes().length);
217+
bb.put(JRakLib.PACKET_SET_OPTION).put((byte) name.getBytes().length).put(name.getBytes()).put(value.getBytes());
218+
client.pushThreadToMainPacket(bb.array());
219+
}
220+
221+
public void sendPacket(Packet packet, String dest, int port) throws IOException {
222+
packet.encode();
223+
sendBytes += packet.buffer.length;
224+
socket.writePacket(packet.buffer, new InetSocketAddress(dest, port));
225+
}
226+
227+
public boolean receiveStream() throws IOException {
228+
byte[] packet = client.readMainToThreadPacket();
229+
if(packet == null){
230+
return false;
231+
}
232+
if(packet.length > 0){
233+
byte id = packet[0];
234+
int offset = 1;
235+
if(id == JRakLib.PACKET_ENCAPSULATED){
236+
int len = packet[offset++];
237+
String identifier = new String(Binary.subbytes(packet, offset, len));
238+
offset += len;
239+
byte flags = packet[offset++];
240+
byte[] buffer = Binary.subbytes(packet, offset);
241+
connection.addEncapsulatedToQueue(EncapsulatedPacket.fromBinary(buffer, true), flags);
242+
} else if(id == JRakLib.PACKET_RAW){
243+
int len = packet[offset++];
244+
String address = new String(Binary.subbytes(packet, offset, len));
245+
offset += len;
246+
int port = Binary.readShort(Binary.subbytes(packet, offset, 2));
247+
offset += 2;
248+
byte[] payload = Binary.subbytes(packet, offset);
249+
socket.writePacket(payload, new InetSocketAddress(address, port));
250+
} else if(id == JRakLib.PACKET_CLOSE_SESSION){
251+
/*
252+
int len = packet[offset++];
253+
String identifier = new String(Binary.subbytes(packet, offset, len));
254+
*/
255+
client.pushThreadToMainPacket(packet);
256+
} else if(id == JRakLib.PACKET_SET_OPTION){
257+
int len = packet[offset++];
258+
String name = new String(Binary.subbytes(packet, offset, len));
259+
offset += len;
260+
String value = new String(Binary.subbytes(packet, offset));
261+
switch(name){
262+
case "portChecking":
263+
portChecking = Boolean.parseBoolean(value);
264+
break;
265+
}
266+
} else if(id == JRakLib.PACKET_SHUTDOWN){
267+
connection.onShutdown();
268+
269+
socket.close();
270+
shutdown = true;
271+
} else if(id == JRakLib.PACKET_EMERGENCY_SHUTDOWN){
272+
shutdown = true;
273+
} else {
274+
return false;
275+
}
276+
return true;
277+
}
278+
return false;
279+
}
280+
281+
private void registerPacket(byte id, Class<? extends Packet> clazz){
282+
packetPool.put(id, clazz);
283+
}
284+
285+
public Packet getPacketFromPool(byte id){
286+
if(packetPool.containsKey(id)){
287+
try {
288+
return packetPool.get(id).newInstance();
289+
} catch (InstantiationException e) {
290+
e.printStackTrace();
291+
} catch (IllegalAccessException e) {
292+
e.printStackTrace();
293+
}
294+
}
295+
return null;
296+
}
297+
298+
private void registerPackets() {
299+
registerPacket(UNCONNECTED_PING.ID, UNCONNECTED_PING.class);
300+
registerPacket(UNCONNECTED_PING_OPEN_CONNECTIONS.ID, UNCONNECTED_PING_OPEN_CONNECTIONS.class);
301+
registerPacket(OPEN_CONNECTION_REQUEST_1.ID, OPEN_CONNECTION_REQUEST_1.class);
302+
registerPacket(OPEN_CONNECTION_REPLY_1.ID, OPEN_CONNECTION_REPLY_1.class);
303+
registerPacket(OPEN_CONNECTION_REQUEST_2.ID, OPEN_CONNECTION_REQUEST_2.class);
304+
registerPacket(OPEN_CONNECTION_REPLY_2.ID, OPEN_CONNECTION_REPLY_2.class);
305+
registerPacket(UNCONNECTED_PONG.ID, UNCONNECTED_PONG.class);
306+
registerPacket(ADVERTISE_SYSTEM.ID, ADVERTISE_SYSTEM.class);
307+
registerPacket(DATA_PACKET_0.ID, DATA_PACKET_0.class);
308+
registerPacket(DATA_PACKET_1.ID, DATA_PACKET_1.class);
309+
registerPacket(DATA_PACKET_2.ID, DATA_PACKET_2.class);
310+
registerPacket(DATA_PACKET_3.ID, DATA_PACKET_3.class);
311+
registerPacket(DATA_PACKET_4.ID, DATA_PACKET_4.class);
312+
registerPacket(DATA_PACKET_5.ID, DATA_PACKET_5.class);
313+
registerPacket(DATA_PACKET_6.ID, DATA_PACKET_6.class);
314+
registerPacket(DATA_PACKET_7.ID, DATA_PACKET_7.class);
315+
registerPacket(DATA_PACKET_8.ID, DATA_PACKET_8.class);
316+
registerPacket(DATA_PACKET_9.ID, DATA_PACKET_9.class);
317+
registerPacket(DATA_PACKET_A.ID, DATA_PACKET_A.class);
318+
registerPacket(DATA_PACKET_B.ID, DATA_PACKET_B.class);
319+
registerPacket(DATA_PACKET_C.ID, DATA_PACKET_C.class);
320+
registerPacket(DATA_PACKET_D.ID, DATA_PACKET_D.class);
321+
registerPacket(DATA_PACKET_E.ID, DATA_PACKET_E.class);
322+
registerPacket(DATA_PACKET_F.ID, DATA_PACKET_F.class);
323+
registerPacket(NACK.ID, NACK.class);
324+
registerPacket(ACK.ID, ACK.class);
325+
}
326+
327+
}

0 commit comments

Comments
 (0)