@@ -110,6 +110,61 @@ impl<'b, T: ValueTrait> Paging<'b, T> {
110110 Ok ( ( ) )
111111 }
112112
113+ pub fn insert ( & mut self , index : LocationOffset , value : T ) -> Result < ( ) > {
114+ let data = serialize ( & value) ?;
115+
116+ let required_space = REQUIRED_SPACE as u16 ;
117+ let free_space = self . page . free_space ( ) ;
118+
119+ if required_space > free_space {
120+ return Err ( Error :: NotEnoughSpace ) ;
121+ }
122+
123+ let free_space = self . page . free_space ( ) ;
124+
125+ if data. len ( ) + ( TUPLE_POINTER_SIZE as usize ) < free_space as usize {
126+ self . page . insert ( index, & data, 0 ) ?;
127+
128+ if REQUIRED_SPACE as u16 >= self . page . free_space ( ) {
129+ self . page . header . set ( PageFlag :: Full ) ;
130+ }
131+
132+ self . block
133+ . write_page ( self . page_number as u64 , & self . page )
134+ . unwrap ( ) ;
135+
136+ return Ok ( ( ) ) ;
137+ }
138+
139+ // split data to fit into the page
140+ let ( data, overflow_data) = data. split_at (
141+ self . page . free_space ( ) as usize - TUPLE_POINTER_SIZE as usize - OVERFLOW_SIZE ,
142+ ) ;
143+
144+ let ( overflow_page, overflow_page_number) = self . allocate_overflow_page (
145+ overflow_data. len ( ) as u16 + TUPLE_POINTER_SIZE + OVERFLOW_SIZE as u16 ,
146+ ) ?;
147+
148+ let ( offset, overflow_page_number) =
149+ self . write_recursively ( overflow_data, overflow_page, overflow_page_number) ?;
150+
151+ let overflow = Overflow :: new ( overflow_page_number, offset) ;
152+
153+ let data_to_write = [ & overflow. to_bytes ( ) , data] . concat ( ) ;
154+
155+ // write the overflow data to the current page
156+ self . page
157+ . insert ( index, & data_to_write, TuplePointerFlags :: OverflowItem as u8 ) ?;
158+
159+ self . page . header . set ( PageFlag :: Full ) ;
160+
161+ self . block
162+ . write_page ( self . page_number as u64 , & self . page )
163+ . unwrap ( ) ;
164+
165+ Ok ( ( ) )
166+ }
167+
113168 fn write_recursively (
114169 & mut self ,
115170 data : & [ u8 ] ,
@@ -266,8 +321,8 @@ impl<'b, T: ValueTrait> Paging<'b, T> {
266321 Ok ( ( overflow_page, overflow_page_number) )
267322 }
268323
269- pub fn page ( & self ) -> & Page {
270- & self . page
324+ pub fn page ( & mut self ) -> & mut Page {
325+ & mut self . page
271326 }
272327
273328 pub fn page_number ( & self ) -> PageNumber {
@@ -354,4 +409,39 @@ mod paging_tests {
354409 let read_value = paging. get ( 0 ) . unwrap ( ) ;
355410 assert_eq ! ( read_value, None ) ;
356411 }
412+
413+ #[ test]
414+ fn remove_non_overflow_data ( ) {
415+ let mut block = Block :: new ( "test_data/paging/remove_non_overflow_data.db" ) . unwrap ( ) ;
416+ let page = PageBuilder :: new ( ) . build ( ) . unwrap ( ) ;
417+ block. write_new_page ( & page) . unwrap ( ) ;
418+
419+ let mut paging = Paging :: < String > :: new ( & mut block, 0 , page) ;
420+
421+ let value = "Hello, world!" . to_string ( ) ;
422+ paging. write ( value. clone ( ) ) . unwrap ( ) ;
423+
424+ paging. remove ( 0 ) . unwrap ( ) ;
425+
426+ let read_value = paging. get ( 0 ) . unwrap ( ) ;
427+ assert_eq ! ( read_value, None ) ;
428+ }
429+
430+ #[ test]
431+ fn insert_overflow_data ( ) {
432+ let mut block = Block :: new ( "test_data/paging/insert_overflow_data.db" ) . unwrap ( ) ;
433+ let page = PageBuilder :: new ( ) . build ( ) . unwrap ( ) ;
434+ block. write_new_page ( & page) . unwrap ( ) ;
435+
436+ let mut paging = Paging :: < String > :: new ( & mut block, 0 , page) ;
437+
438+ let value = "Hello, world!" ;
439+ paging. write ( value. to_owned ( ) ) . unwrap ( ) ;
440+
441+ let value = "Hello, John Doe!" . repeat ( 1000 ) ;
442+ paging. insert ( 0 , value. clone ( ) ) . unwrap ( ) ;
443+
444+ let read_value = paging. get ( 0 ) . unwrap ( ) . unwrap ( ) ;
445+ assert_eq ! ( read_value, value) ;
446+ }
357447}
0 commit comments