Skip to content

Commit 811e70b

Browse files
guyinyouguyinyou
andauthored
golang: support LZ4 and ZSTD compression algorithm (#958)
* support LZ4 and ZSTD compression algorithm * update zstd * update go mod --------- Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com>
1 parent 322e9a2 commit 811e70b

4 files changed

Lines changed: 129 additions & 13 deletions

File tree

golang/go.mod

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ require (
66
github.com/go-playground/validator/v10 v10.11.0
77
github.com/golang/mock v1.6.0
88
github.com/google/uuid v1.3.0
9+
github.com/klauspost/compress v1.16.7
910
github.com/natefinch/lumberjack v2.0.0+incompatible
11+
github.com/pierrec/lz4 v2.6.1+incompatible
1012
github.com/prashantv/gostub v1.1.0
1113
github.com/stretchr/testify v1.7.1
1214
go.uber.org/zap v1.21.0
@@ -19,8 +21,9 @@ require (
1921
require (
2022
github.com/benbjohnson/clock v1.3.0 // indirect
2123
github.com/davecgh/go-spew v1.1.1 // indirect
24+
github.com/frankban/quicktest v1.14.6 // indirect
2225
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
23-
github.com/google/go-cmp v0.5.8 // indirect
26+
github.com/google/go-cmp v0.5.9 // indirect
2427
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
2528
github.com/pkg/errors v0.9.1 // indirect
2629
github.com/pmezard/go-difflib v1.0.0 // indirect
@@ -36,7 +39,7 @@ require (
3639
github.com/dchest/siphash v1.2.3
3740
github.com/go-playground/locales v0.14.0 // indirect
3841
github.com/go-playground/universal-translator v0.18.0 // indirect
39-
github.com/golang/protobuf v1.5.2 // indirect
42+
github.com/golang/protobuf v1.5.2
4043
github.com/leodido/go-urn v1.2.1 // indirect
4144
github.com/valyala/fastrand v1.1.0
4245
go.opencensus.io v0.22.5

golang/go.sum

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m
3232
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
3333
github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE=
3434
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
35+
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
36+
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
3537
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
3638
github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A=
3739
github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
@@ -72,8 +74,8 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
7274
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
7375
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
7476
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
75-
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
76-
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
77+
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
78+
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
7779
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
7880
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
7981
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
@@ -87,10 +89,21 @@ github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFb
8789
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
8890
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
8991
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
92+
github.com/klauspost/compress v1.11.1 h1:bPb7nMRdOZYDrpPMTA3EInUQrdgoBinqUuSwlGdKDdE=
93+
github.com/klauspost/compress v1.11.1/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
94+
github.com/klauspost/compress v1.14.0 h1:PMLEuM4T/QQJPbP9IviImySv8Ehv/vc8pB+dBLUr+74=
95+
github.com/klauspost/compress v1.14.0/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
96+
github.com/klauspost/compress v1.16.0 h1:iULayQNOReoYUe+1qtKOqw9CwJv3aNQu8ivo7lw1HU4=
97+
github.com/klauspost/compress v1.16.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
98+
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
99+
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
100+
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
101+
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
90102
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
91103
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
92-
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
93104
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
105+
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
106+
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
94107
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
95108
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
96109
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
@@ -99,6 +112,8 @@ github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w=
99112
github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY=
100113
github.com/natefinch/lumberjack v2.0.0+incompatible h1:4QJd3OLAMgj7ph+yZTuX13Ld4UpgHp07nNdFX7mqFfM=
101114
github.com/natefinch/lumberjack v2.0.0+incompatible/go.mod h1:Wi9p2TTF5DG5oU+6YfsmYQpsTIOm0B1VNzQg9Mw6nPk=
115+
github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM=
116+
github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
102117
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
103118
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
104119
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
@@ -111,8 +126,9 @@ github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:
111126
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
112127
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
113128
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
114-
github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8=
115129
github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE=
130+
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
131+
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
116132
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
117133
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
118134
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=

golang/pkg/utils/utils.go

Lines changed: 58 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,23 @@ import (
3636

3737
"github.com/apache/rocketmq-clients/golang/v5/metadata"
3838
v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
39+
"github.com/klauspost/compress/zstd"
40+
"github.com/pierrec/lz4"
3941
"github.com/valyala/fastrand"
4042
"go.opencensus.io/trace"
4143
MD "google.golang.org/grpc/metadata"
4244
)
4345

46+
type CompressionType int32
47+
48+
const (
49+
Unknown CompressionType = 0
50+
GZIP CompressionType = 1
51+
Zlib CompressionType = 2
52+
LZ4 CompressionType = 3
53+
ZSTD CompressionType = 4
54+
)
55+
4456
func Mod(n int32, m int) int {
4557
if int32(m) <= 0 {
4658
return 0
@@ -141,14 +153,40 @@ func MatchMessageType(mq *v2.MessageQueue, messageType v2.MessageType) bool {
141153
return false
142154
}
143155

144-
func AutoDecode(in []byte) ([]byte, error) {
145-
if len(in) < 2 {
146-
return in, fmt.Errorf("unknown format")
156+
func MatchCompressionAlgorithm(in []byte) CompressionType {
157+
if in == nil {
158+
return Unknown
159+
}
160+
if len(in) >= 2 {
161+
if in[0] == 0x78 {
162+
return Zlib
163+
} else if in[0] == 0x1f && in[1] == 0x8b {
164+
return GZIP
165+
}
147166
}
148-
if in[0] == 0x1f && in[1] == 0x8b {
167+
if len(in) >= 4 {
168+
if in[0] == 0x04 && in[1] == 0x22 && in[2] == 0x4D && in[3] == 0x18 {
169+
return LZ4
170+
} else if in[0] == 0x28 && in[1] == 0xB5 && in[2] == 0x2F && in[3] == 0xFD {
171+
return ZSTD
172+
}
173+
}
174+
return Unknown
175+
}
176+
177+
func AutoDecode(in []byte) ([]byte, error) {
178+
compressionType := MatchCompressionAlgorithm(in)
179+
switch compressionType {
180+
case Zlib:
181+
return ZlibDecode(in)
182+
case GZIP:
149183
return GZIPDecode(in)
184+
case LZ4:
185+
return Lz4Decode(in)
186+
case ZSTD:
187+
return ZstdDecode(in)
150188
}
151-
return ZlibDecode(in)
189+
return in, fmt.Errorf("unknown format")
152190
}
153191

154192
func ZlibDecode(in []byte) ([]byte, error) {
@@ -161,6 +199,21 @@ func ZlibDecode(in []byte) ([]byte, error) {
161199
return ioutil.ReadAll(reader)
162200
}
163201

202+
func Lz4Decode(in []byte) ([]byte, error) {
203+
reader := lz4.NewReader(bytes.NewReader(in))
204+
return ioutil.ReadAll(reader)
205+
}
206+
207+
func ZstdDecode(in []byte) ([]byte, error) {
208+
reader, err := zstd.NewReader(bytes.NewReader(in))
209+
if err != nil {
210+
var out []byte
211+
return out, err
212+
}
213+
defer reader.Close()
214+
return ioutil.ReadAll(reader)
215+
}
216+
164217
func GZIPDecode(in []byte) ([]byte, error) {
165218
reader, err := gzip.NewReader(bytes.NewReader(in))
166219
if err != nil {

golang/pkg/utils/utils_test.go

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,10 +114,10 @@ func TestMatchMessageType(t *testing.T) {
114114

115115
func TestAutoDecode(t *testing.T) {
116116
_, err := AutoDecode([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
117-
if err != zlib.ErrHeader {
117+
if err == nil {
118118
t.Error()
119119
}
120-
_, err = AutoDecode([]byte{0})
120+
_, err = AutoDecode([]byte{78, 0})
121121
if err == nil {
122122
t.Error()
123123
}
@@ -137,6 +137,22 @@ func TestAutoDecode(t *testing.T) {
137137
if string(bytes) != "rocketmq-client-go" {
138138
t.Error()
139139
}
140+
// lz4
141+
bytes, err = AutoDecode([]byte{4, 34, 77, 24, 100, 112, 185, 18, 0, 0, 128, 114, 111, 99, 107, 101, 116, 109, 113, 45, 99, 108, 105, 101, 110, 116, 45, 103, 111, 0, 0, 0, 0, 248, 183, 23, 47})
142+
if err != nil {
143+
t.Error()
144+
}
145+
if string(bytes) != "rocketmq-client-go" {
146+
t.Error()
147+
}
148+
// zstd
149+
bytes, err = AutoDecode([]byte{40, 181, 47, 253, 32, 18, 145, 0, 0, 114, 111, 99, 107, 101, 116, 109, 113, 45, 99, 108, 105, 101, 110, 116, 45, 103, 111})
150+
if err != nil {
151+
t.Error()
152+
}
153+
if string(bytes) != "rocketmq-client-go" {
154+
t.Error()
155+
}
140156
}
141157

142158
func TestGZIPDecode(t *testing.T) {
@@ -167,6 +183,34 @@ func TestZlibDecode(t *testing.T) {
167183
}
168184
}
169185

186+
func TestLz4Decode(t *testing.T) {
187+
_, err := Lz4Decode([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
188+
if err == nil {
189+
t.Error()
190+
}
191+
bytes, err := Lz4Decode([]byte{4, 34, 77, 24, 100, 112, 185, 18, 0, 0, 128, 114, 111, 99, 107, 101, 116, 109, 113, 45, 99, 108, 105, 101, 110, 116, 45, 103, 111, 0, 0, 0, 0, 248, 183, 23, 47})
192+
if err != nil {
193+
t.Error()
194+
}
195+
if string(bytes) != "rocketmq-client-go" {
196+
t.Error()
197+
}
198+
}
199+
200+
func TestZstdDecode(t *testing.T) {
201+
_, err := ZstdDecode([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
202+
if err == nil {
203+
t.Error()
204+
}
205+
bytes, err := ZstdDecode([]byte{40, 181, 47, 253, 32, 18, 145, 0, 0, 114, 111, 99, 107, 101, 116, 109, 113, 45, 99, 108, 105, 101, 110, 116, 45, 103, 111})
206+
if err != nil {
207+
t.Error()
208+
}
209+
if string(bytes) != "rocketmq-client-go" {
210+
t.Error()
211+
}
212+
}
213+
170214
func TestSelectAnAddress(t *testing.T) {
171215
if SelectAnAddress(nil) != nil {
172216
t.Error()

0 commit comments

Comments
 (0)