forked from SpringMT/zstd-ruby
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstreaming_decompress.c
More file actions
153 lines (134 loc) · 4.26 KB
/
streaming_decompress.c
File metadata and controls
153 lines (134 loc) · 4.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
#include "common.h"
struct streaming_decompress_t {
ZSTD_DCtx* dctx;
VALUE buf;
size_t buf_size;
};
static void
streaming_decompress_mark(void *p)
{
struct streaming_decompress_t *sd = p;
#ifdef HAVE_RB_GC_MARK_MOVABLE
rb_gc_mark_movable(sd->buf);
#else
rb_gc_mark(sd->buf);
#endif
}
static void
streaming_decompress_free(void *p)
{
struct streaming_decompress_t *sd = p;
ZSTD_DCtx* dctx = sd->dctx;
if (dctx != NULL) {
ZSTD_freeDCtx(dctx);
}
xfree(sd);
}
static size_t
streaming_decompress_memsize(const void *p)
{
return sizeof(struct streaming_decompress_t);
}
#ifdef HAVE_RB_GC_MARK_MOVABLE
static void
streaming_decompress_compact(void *p)
{
struct streaming_decompress_t *sd = p;
sd->buf = rb_gc_location(sd->buf);
}
#endif
static const rb_data_type_t streaming_decompress_type = {
"streaming_decompress",
{
streaming_decompress_mark,
streaming_decompress_free,
streaming_decompress_memsize,
#ifdef HAVE_RB_GC_MARK_MOVABLE
streaming_decompress_compact,
#endif
},
0, 0, RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED
};
static VALUE
rb_streaming_decompress_allocate(VALUE klass)
{
struct streaming_decompress_t* sd;
VALUE obj = TypedData_Make_Struct(klass, struct streaming_decompress_t, &streaming_decompress_type, sd);
sd->dctx = NULL;
RB_OBJ_WRITE(obj, &sd->buf, Qnil);
sd->buf_size = 0;
return obj;
}
static VALUE
rb_streaming_decompress_initialize(int argc, VALUE *argv, VALUE obj)
{
VALUE kwargs;
rb_scan_args(argc, argv, "00:", &kwargs);
struct streaming_decompress_t* sd;
TypedData_Get_Struct(obj, struct streaming_decompress_t, &streaming_decompress_type, sd);
size_t const buffOutSize = ZSTD_DStreamOutSize();
ZSTD_DCtx* dctx = ZSTD_createDCtx();
if (dctx == NULL) {
rb_raise(rb_eRuntimeError, "%s", "ZSTD_createDCtx error");
}
set_decompress_params(dctx, kwargs);
sd->dctx = dctx;
RB_OBJ_WRITE(obj, &sd->buf, rb_str_new(NULL, buffOutSize));
sd->buf_size = buffOutSize;
return obj;
}
static VALUE
rb_streaming_decompress_decompress(VALUE obj, VALUE src)
{
StringValue(src);
const char* input_data = RSTRING_PTR(src);
size_t input_size = RSTRING_LEN(src);
ZSTD_inBuffer input = { input_data, input_size, 0 };
struct streaming_decompress_t* sd;
TypedData_Get_Struct(obj, struct streaming_decompress_t, &streaming_decompress_type, sd);
VALUE result = rb_str_new(0, 0);
while (input.pos < input.size) {
const char* output_data = RSTRING_PTR(sd->buf);
ZSTD_outBuffer output = { (void*)output_data, sd->buf_size, 0 };
size_t const ret = zstd_stream_decompress(sd->dctx, &output, &input, false);
if (ZSTD_isError(ret)) {
rb_raise(rb_eRuntimeError, "decompress error error code: %s", ZSTD_getErrorName(ret));
}
if (output.pos > 0) {
rb_str_cat(result, output.dst, output.pos);
}
if (ret == 0 && output.pos == 0) {
break;
}
}
return result;
}
static VALUE
rb_streaming_decompress_decompress_with_pos(VALUE obj, VALUE src)
{
StringValue(src);
const char* input_data = RSTRING_PTR(src);
size_t input_size = RSTRING_LEN(src);
ZSTD_inBuffer input = { input_data, input_size, 0 };
struct streaming_decompress_t* sd;
TypedData_Get_Struct(obj, struct streaming_decompress_t, &streaming_decompress_type, sd);
const char* output_data = RSTRING_PTR(sd->buf);
VALUE result = rb_str_new(0, 0);
ZSTD_outBuffer output = { (void*)output_data, sd->buf_size, 0 };
size_t const ret = zstd_stream_decompress(sd->dctx, &output, &input, false);
if (ZSTD_isError(ret)) {
rb_raise(rb_eRuntimeError, "decompress error error code: %s", ZSTD_getErrorName(ret));
}
rb_str_cat(result, output.dst, output.pos);
return rb_ary_new_from_args(2, result, ULONG2NUM(input.pos));
}
extern VALUE rb_mZstd, cStreamingDecompress;
void
zstd_ruby_streaming_decompress_init(void)
{
VALUE cStreamingDecompress = rb_define_class_under(rb_mZstd, "StreamingDecompress", rb_cObject);
rb_define_alloc_func(cStreamingDecompress, rb_streaming_decompress_allocate);
rb_define_method(cStreamingDecompress, "initialize", rb_streaming_decompress_initialize, -1);
rb_define_method(cStreamingDecompress, "decompress", rb_streaming_decompress_decompress, 1);
rb_define_method(cStreamingDecompress, "decompress_with_pos", rb_streaming_decompress_decompress_with_pos, 1);
}