Skip to content

Commit 47ab404

Browse files
committed
netty serializer
1 parent d152cbc commit 47ab404

10 files changed

Lines changed: 187 additions & 23 deletions

File tree

tx-commons/src/main/java/com/codingapi/txlcn/commons/util/serializer/ISerializer.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,22 @@
1818

1919
import com.codingapi.txlcn.commons.exception.SerializerException;
2020

21+
import java.io.InputStream;
22+
import java.io.OutputStream;
23+
2124
/**
2225
* @author lorne 2017/11/11
2326
*/
2427
public interface ISerializer {
28+
2529
/**
2630
* 序列化对象
2731
*
2832
* @param obj 需要序更列化的对象
29-
* @return byte [] 序列号结果
33+
* @param outputStream 写入对象
3034
* @throws SerializerException 序列化异常
3135
*/
32-
byte[] serialize(Object obj) throws SerializerException;
36+
void serialize(Object obj, OutputStream outputStream) throws SerializerException ;
3337

3438

3539
/**
@@ -43,4 +47,30 @@ public interface ISerializer {
4347
*/
4448

4549
<T> T deSerialize(byte[] param, Class<T> clazz) throws SerializerException;
50+
51+
52+
53+
54+
/**
55+
* 反序列化对象
56+
*
57+
* @param inputStream 需要反序列化的inputStream
58+
* @param clazz 反序列化成为的bean对象Class
59+
* @param <T> 反序列化成为的bean对象
60+
* @return 对象
61+
* @throws SerializerException 序列化异常
62+
*/
63+
64+
<T> T deSerialize(InputStream inputStream, Class<T> clazz) throws SerializerException;
65+
66+
67+
/**
68+
* 序列化对象
69+
*
70+
* @param obj 需要序更列化的对象
71+
* @return byte [] 序列号结果
72+
* @throws SerializerException 序列化异常
73+
*/
74+
byte[] serialize(Object obj) throws SerializerException;
75+
4676
}

tx-commons/src/main/java/com/codingapi/txlcn/commons/util/serializer/ProtostuffSerializer.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@
2222
import org.objenesis.Objenesis;
2323
import org.objenesis.ObjenesisStd;
2424

25-
import java.io.ByteArrayInputStream;
26-
import java.io.ByteArrayOutputStream;
25+
import java.io.*;
2726

2827
/**
2928
* @author lorne 2017/11/11
@@ -54,6 +53,22 @@ public byte[] serialize(Object obj) throws SerializerException {
5453
}
5554
}
5655

56+
57+
@Override
58+
public void serialize(Object obj, OutputStream outputStream) throws SerializerException {
59+
Class cls = obj.getClass();
60+
LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
61+
try {
62+
Schema schema = getSchema(cls);
63+
ProtostuffIOUtil.writeTo(outputStream, obj, schema, buffer);
64+
} catch (Exception e) {
65+
throw new SerializerException(e.getMessage(), e);
66+
}finally {
67+
buffer.clear();
68+
}
69+
}
70+
71+
5772
@Override
5873
public <T> T deSerialize(byte[] param, Class<T> cls) throws SerializerException {
5974
T object;
@@ -66,5 +81,18 @@ public <T> T deSerialize(byte[] param, Class<T> cls) throws SerializerException
6681
throw new SerializerException(e.getMessage(), e);
6782
}
6883
}
84+
85+
@Override
86+
public <T> T deSerialize(InputStream inputStream, Class<T> cls) throws SerializerException {
87+
T object;
88+
try{
89+
object = OBJENESIS.newInstance(cls);
90+
Schema schema = getSchema(cls);
91+
ProtostuffIOUtil.mergeFrom(inputStream, object, schema);
92+
return object;
93+
} catch (Exception e) {
94+
throw new SerializerException(e.getMessage(), e);
95+
}
96+
}
6997
}
7098

tx-commons/src/main/java/com/codingapi/txlcn/commons/util/serializer/SerializerContext.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
import com.codingapi.txlcn.commons.exception.SerializerException;
1919

20+
import java.io.InputStream;
21+
import java.io.OutputStream;
22+
2023
/**
2124
* @author lorne 2018/12/31
2225
*
@@ -52,4 +55,14 @@ public byte[] serialize(Object obj) throws SerializerException {
5255
public <T> T deSerialize(byte[] param, Class<T> clazz) throws SerializerException {
5356
return protostuffSerializer.deSerialize(param,clazz);
5457
}
58+
59+
@Override
60+
public <T> T deSerialize(InputStream inputStream, Class<T> clazz) throws SerializerException {
61+
return protostuffSerializer.deSerialize(inputStream,clazz);
62+
}
63+
64+
@Override
65+
public void serialize(Object obj, OutputStream outputStream) throws SerializerException {
66+
protostuffSerializer.serialize(obj, outputStream);
67+
}
5568
}

tx-spi-message-netty/src/main/java/com/codingapi/txlcn/spi/message/netty/bean/RpcCmdContext.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@
3434
@Slf4j
3535
public class RpcCmdContext {
3636

37-
private RpcConfig rpcConfig;
37+
private int cacheSize = 1024;
38+
39+
private int waitTime = 1;
3840

3941
private static RpcCmdContext context = null;
4042

@@ -108,7 +110,7 @@ private RpcContent findRpcContent() {
108110
}
109111

110112
private RpcContent createRpcContent() {
111-
if (cacheList.size() < rpcConfig.getCacheSize()) {
113+
if (cacheList.size() < cacheSize) {
112114
RpcContent rpcContent = new RpcContent(getWaitTime());
113115
rpcContent.init();
114116
cacheList.add(rpcContent);
@@ -136,10 +138,11 @@ private void clearKey(String key) {
136138

137139

138140
public void setRpcConfig(RpcConfig rpcConfig) {
139-
this.rpcConfig = rpcConfig;
141+
cacheSize = rpcConfig.getCacheSize();
142+
waitTime = (int) rpcConfig.getWaitTime()/1000;
140143
}
141144

142145
public int getWaitTime() {
143-
return (int) (rpcConfig.getWaitTime() / 1000);
146+
return waitTime;
144147
}
145148
}

tx-spi-message-netty/src/main/java/com/codingapi/txlcn/spi/message/netty/bean/SocketManager.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public class SocketManager {
5050

5151
private static SocketManager manager = null;
5252

53-
private RpcConfig rpcConfig;
53+
private long attrDelayTime = 1000*60;
5454

5555
private SocketManager() {
5656
channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@@ -89,7 +89,7 @@ public void removeChannel(Channel channel) {
8989
executorService.schedule(() -> {
9090
String key = channel.remoteAddress().toString();
9191
appNames.remove(key);
92-
}, rpcConfig.getAttrDelayTime(), TimeUnit.MILLISECONDS);
92+
}, attrDelayTime, TimeUnit.MILLISECONDS);
9393
} catch (RejectedExecutionException ignored) {
9494
// caused down server.
9595
}
@@ -182,7 +182,7 @@ public void bindModuleName(String remoteKey, String moduleName) {
182182
}
183183

184184
public void setRpcConfig(RpcConfig rpcConfig) {
185-
this.rpcConfig = rpcConfig;
185+
attrDelayTime = rpcConfig.getAttrDelayTime();
186186
}
187187

188188
/**

tx-spi-message-netty/src/main/java/com/codingapi/txlcn/spi/message/netty/handler/NettyRpcClientHandlerInitHandler.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,6 @@
1919
import io.netty.channel.ChannelInitializer;
2020
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
2121
import io.netty.handler.codec.LengthFieldPrepender;
22-
import io.netty.handler.codec.serialization.ClassResolvers;
23-
import io.netty.handler.codec.serialization.ObjectDecoder;
24-
import io.netty.handler.codec.serialization.ObjectEncoder;
2522
import org.springframework.beans.factory.annotation.Autowired;
2623
import org.springframework.stereotype.Component;
2724

@@ -43,11 +40,14 @@ public class NettyRpcClientHandlerInitHandler extends ChannelInitializer<Channel
4340

4441
@Override
4542
protected void initChannel(Channel ch) throws Exception {
43+
4644
ch.pipeline().addLast(new LengthFieldPrepender(4, false));
47-
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
45+
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,
46+
0, 4, 0, 4));
47+
48+
ch.pipeline().addLast(new ObjectSerializerEncoder());
49+
ch.pipeline().addLast(new ObjectSerializerDecoder());
4850

49-
ch.pipeline().addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
50-
ch.pipeline().addLast(new ObjectEncoder());
5151

5252
ch.pipeline().addLast(new RpcCmdDecoder());
5353
ch.pipeline().addLast(new RpcCmdEncoder());

tx-spi-message-netty/src/main/java/com/codingapi/txlcn/spi/message/netty/handler/NettyRpcServerHandlerInitHandler.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,6 @@
2020
import io.netty.channel.ChannelInitializer;
2121
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
2222
import io.netty.handler.codec.LengthFieldPrepender;
23-
import io.netty.handler.codec.serialization.ClassResolvers;
24-
import io.netty.handler.codec.serialization.ObjectDecoder;
25-
import io.netty.handler.codec.serialization.ObjectEncoder;
2623
import io.netty.handler.timeout.IdleStateHandler;
2724
import org.springframework.beans.factory.annotation.Autowired;
2825
import org.springframework.stereotype.Component;
@@ -56,8 +53,10 @@ protected void initChannel(Channel ch) throws Exception {
5653
ch.pipeline().addLast(new IdleStateHandler(managerProperties.getCheckTime(),
5754
managerProperties.getCheckTime(), managerProperties.getCheckTime(), TimeUnit.MILLISECONDS));
5855

59-
ch.pipeline().addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
60-
ch.pipeline().addLast(new ObjectEncoder());
56+
57+
ch.pipeline().addLast(new ObjectSerializerEncoder());
58+
ch.pipeline().addLast(new ObjectSerializerDecoder());
59+
6160

6261
ch.pipeline().addLast(new RpcCmdDecoder());
6362
ch.pipeline().addLast(new RpcCmdEncoder());
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright 2017-2019 CodingApi .
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.codingapi.txlcn.spi.message.netty.handler;
17+
18+
import com.codingapi.txlcn.commons.util.serializer.SerializerContext;
19+
import com.codingapi.txlcn.spi.message.netty.bean.NettyRpcCmd;
20+
import io.netty.buffer.ByteBuf;
21+
import io.netty.buffer.ByteBufInputStream;
22+
import io.netty.channel.ChannelHandlerContext;
23+
import io.netty.handler.codec.MessageToMessageDecoder;
24+
import lombok.extern.slf4j.Slf4j;
25+
26+
import java.util.List;
27+
28+
/**
29+
* @author lorne
30+
* @date 2019/1/19
31+
* @description
32+
*/
33+
@Slf4j
34+
public class ObjectSerializerDecoder extends MessageToMessageDecoder<ByteBuf> {
35+
36+
37+
@Override
38+
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
39+
if(msg==null){
40+
return;
41+
}
42+
ByteBufInputStream byteBufInputStream = new ByteBufInputStream(msg, true);
43+
NettyRpcCmd object = SerializerContext.getInstance().deSerialize(byteBufInputStream, NettyRpcCmd.class);
44+
out.add(object);
45+
}
46+
47+
48+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright 2017-2019 CodingApi .
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.codingapi.txlcn.spi.message.netty.handler;
17+
18+
import com.codingapi.txlcn.commons.util.serializer.SerializerContext;
19+
import io.netty.buffer.ByteBuf;
20+
import io.netty.buffer.ByteBufOutputStream;
21+
import io.netty.channel.ChannelHandlerContext;
22+
import io.netty.handler.codec.MessageToByteEncoder;
23+
import lombok.extern.slf4j.Slf4j;
24+
25+
import java.io.Serializable;
26+
27+
/**
28+
* @author lorne
29+
* @date 2019/1/19
30+
* @description
31+
*/
32+
@Slf4j
33+
public class ObjectSerializerEncoder extends MessageToByteEncoder<Serializable> {
34+
35+
36+
@Override
37+
protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception {
38+
ByteBufOutputStream bout = new ByteBufOutputStream(out);
39+
SerializerContext.getInstance().serialize(msg,bout);
40+
}
41+
42+
43+
}

tx-spi-message-netty/src/main/java/com/codingapi/txlcn/spi/message/netty/impl/NettyRpcClientInitializer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public synchronized void connect(SocketAddress socketAddress) {
8484
b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
8585
b.handler(nettyRpcClientHandlerInitHandler);
8686
ChannelFuture channelFuture = b.connect(socketAddress).syncUninterruptibly();
87-
log.info("client -> {} , transactionState:{}", socketAddress, channelFuture.isSuccess());
87+
log.info("client -> {} , state :{}", socketAddress, channelFuture.isSuccess());
8888
connected = true;
8989
break;
9090

0 commit comments

Comments
 (0)