Skip to content

Commit 322e9a2

Browse files
guyinyouguyinyou
andauthored
golang: support zlib decode (#957)
Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com>
1 parent f3c288e commit 322e9a2

3 files changed

Lines changed: 64 additions & 1 deletion

File tree

golang/message.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ func fromProtobuf_MessageView2(message *v2.Message, messageQueue *v2.MessageQueu
231231
bodyEncoding := systemProperties.GetBodyEncoding()
232232
switch bodyEncoding {
233233
case v2.Encoding_GZIP:
234-
unCompressBody, err := utils.GZIPDecode(message.GetBody())
234+
unCompressBody, err := utils.AutoDecode(message.GetBody())
235235
if err != nil {
236236
sugarBaseLogger.Errorf("failed to uncompress message body, topic=%s, messageId=%s, err=%w", mv.topic, mv.messageId, err)
237237
corrupted = true

golang/pkg/utils/utils.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package utils
2020
import (
2121
"bytes"
2222
"compress/gzip"
23+
"compress/zlib"
2324
"context"
2425
"encoding/hex"
2526
"fmt"
@@ -140,6 +141,26 @@ func MatchMessageType(mq *v2.MessageQueue, messageType v2.MessageType) bool {
140141
return false
141142
}
142143

144+
func AutoDecode(in []byte) ([]byte, error) {
145+
if len(in) < 2 {
146+
return in, fmt.Errorf("unknown format")
147+
}
148+
if in[0] == 0x1f && in[1] == 0x8b {
149+
return GZIPDecode(in)
150+
}
151+
return ZlibDecode(in)
152+
}
153+
154+
func ZlibDecode(in []byte) ([]byte, error) {
155+
reader, err := zlib.NewReader(bytes.NewReader(in))
156+
if err != nil {
157+
var out []byte
158+
return out, err
159+
}
160+
defer reader.Close()
161+
return ioutil.ReadAll(reader)
162+
}
163+
143164
func GZIPDecode(in []byte) ([]byte, error) {
144165
reader, err := gzip.NewReader(bytes.NewReader(in))
145166
if err != nil {

golang/pkg/utils/utils_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package utils
1919

2020
import (
2121
"compress/gzip"
22+
"compress/zlib"
2223
"testing"
2324

2425
v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
@@ -111,6 +112,33 @@ func TestMatchMessageType(t *testing.T) {
111112
}
112113
}
113114

115+
func TestAutoDecode(t *testing.T) {
116+
_, err := AutoDecode([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
117+
if err != zlib.ErrHeader {
118+
t.Error()
119+
}
120+
_, err = AutoDecode([]byte{0})
121+
if err == nil {
122+
t.Error()
123+
}
124+
// gzip
125+
bytes, err := AutoDecode([]byte{31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 42, 202, 79, 206, 78, 45, 201, 45, 212, 77, 206, 201, 76, 205, 43, 209, 77, 207, 7, 0, 0, 0, 255, 255, 1, 0, 0, 255, 255, 97, 36, 132, 114, 18, 0, 0, 0})
126+
if err != nil {
127+
t.Error()
128+
}
129+
if string(bytes) != "rocketmq-client-go" {
130+
t.Error()
131+
}
132+
// zlib
133+
bytes, err = AutoDecode([]byte{120, 156, 42, 202, 79, 206, 78, 45, 201, 45, 212, 77, 206, 201, 76, 205, 43, 209, 77, 207, 7, 4, 0, 0, 255, 255, 68, 223, 7, 22})
134+
if err != nil {
135+
t.Error()
136+
}
137+
if string(bytes) != "rocketmq-client-go" {
138+
t.Error()
139+
}
140+
}
141+
114142
func TestGZIPDecode(t *testing.T) {
115143
_, err := GZIPDecode([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
116144
if err != gzip.ErrHeader {
@@ -125,6 +153,20 @@ func TestGZIPDecode(t *testing.T) {
125153
}
126154
}
127155

156+
func TestZlibDecode(t *testing.T) {
157+
_, err := ZlibDecode([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
158+
if err != zlib.ErrHeader {
159+
t.Error()
160+
}
161+
bytes, err := ZlibDecode([]byte{120, 156, 42, 202, 79, 206, 78, 45, 201, 45, 212, 77, 206, 201, 76, 205, 43, 209, 77, 207, 7, 4, 0, 0, 255, 255, 68, 223, 7, 22})
162+
if err != nil {
163+
t.Error()
164+
}
165+
if string(bytes) != "rocketmq-client-go" {
166+
t.Error()
167+
}
168+
}
169+
128170
func TestSelectAnAddress(t *testing.T) {
129171
if SelectAnAddress(nil) != nil {
130172
t.Error()

0 commit comments

Comments
 (0)