Skip to content

Commit 02879eb

Browse files
committed
Implemented hash join
1 parent fa62a91 commit 02879eb

14 files changed

Lines changed: 577 additions & 30 deletions

File tree

CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ add_library(SR STATIC
4040
include/buffer.h
4141
src/queue.c
4242
include/queue.h
43+
include/hash_table.h
44+
src/hash_table.c
4345
)
4446

4547
enable_testing()

include/hash_table.h

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
//
2+
// Created by Seppe Degryse on 15/04/2025.
3+
//
4+
5+
#ifndef HASH_TABLE_H
6+
#define HASH_TABLE_H
7+
8+
#include <stdio.h>
9+
#include <stdlib.h>
10+
11+
#include <data.h>
12+
13+
typedef struct Bucket {
14+
triple_t **tuples;
15+
size_t count;
16+
size_t capacity;
17+
} bucket_t;
18+
19+
typedef struct HashTable {
20+
bucket_t *table;
21+
size_t size;
22+
} hash_table_t;
23+
24+
25+
uint32_t hash(uint32_t key, size_t table_size);
26+
27+
hash_table_t create_table(size_t num_elements);
28+
29+
void free_table(const hash_table_t *ht);
30+
31+
bool insert(hash_table_t *ht, uint32_t offset, triple_t *el);
32+
33+
bucket_t *contains(const hash_table_t *ht, uint32_t offset_in1, uint32_t offset_in2, triple_t *el);
34+
35+
#endif //HASH_TABLE_H

include/memory.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,14 @@ void* tracked_malloc(size_t size);
1111

1212
void* tracked_realloc(void* ptr, size_t new_size);
1313

14+
void tracked_free(void* ptr, size_t size);
15+
1416
size_t get_alloc_count();
1517

1618
size_t get_total_allocated();
1719

20+
size_t get_peak_allocated();
21+
1822
void reset_memory_counter();
1923

2024
#endif //MEMORY_H

include/operator.h

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
enum OPERATORS {
1313
JOIN,
14+
HASH_JOIN,
1415
CARTESIAN,
1516
FILTER,
1617
WINDOW,
@@ -22,14 +23,21 @@ typedef bool (*join_check_t)(triple_t in1, triple_t in2);
2223
typedef bool (*filter_check_t)(triple_t in);
2324

2425
typedef struct JoinParams {
25-
uint8_t size;
26+
uint8_t size;
2627
join_check_t *checks;
2728
} join_params_t;
2829

2930
typedef struct CartJoinParams {
30-
double probability;
31+
double probability;
3132
} cart_join_params_t;
3233

34+
typedef struct HashJoinParams {
35+
uint32_t predicate_in1;
36+
uint8_t offset_in1;
37+
uint32_t predicate_in2;
38+
uint8_t offset_in2;
39+
} hash_join_params_t;
40+
3341
typedef struct FilterParams {
3442
uint8_t size;
3543
filter_check_t *checks;
@@ -48,18 +56,19 @@ typedef struct WindowParams {
4856
} window_params_t;
4957

5058
typedef union Parameters {
51-
struct JoinParams join;
52-
struct CartJoinParams cart_join;
53-
struct FilterParams filter;
54-
struct SelectParams select;
55-
struct WindowParams window;
59+
struct JoinParams join;
60+
struct HashJoinParams hash_join;
61+
struct CartJoinParams cart_join;
62+
struct FilterParams filter;
63+
struct SelectParams select;
64+
struct WindowParams window;
5665
} parameter_t;
5766

5867
typedef struct Operator {
59-
enum OPERATORS type;
60-
struct Operator *left;
61-
struct Operator *right;
62-
union Parameters params;
68+
enum OPERATORS type;
69+
struct Operator *left;
70+
struct Operator *right;
71+
union Parameters params;
6372
} operator_t;
6473

6574
#endif //OPERATOR_H

include/query.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
#include <pthread.h>
1313
#include <stdatomic.h>
1414

15+
#include "hash_table.h"
16+
1517

1618
typedef struct ExecutionStep {
1719
const operator_t *operator_;
@@ -63,6 +65,9 @@ void flatten_query(const operator_t* operator_, spsc_queue_t *results, uint8_t i
6365
void join_triple_copy(const data_t *src1, uint32_t index1,
6466
const data_t *src2, uint32_t index2, data_t *dest);
6567

68+
void join_bucket_copy(const data_t *src1, bucket_t *bucket,
69+
const data_t *src2, uint32_t index2, data_t *dest);
70+
6671
bool join_check(const data_t *src1, uint32_t index1,
6772
const data_t *src2, uint32_t index2, join_params_t check);
6873

include/queue.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,6 @@ bool spsc_is_empty(spsc_queue_t *q);
2727

2828
void empty_queue(spsc_queue_t *q);
2929

30+
void empty_queue_ndata(spsc_queue_t *q);
31+
3032
#endif //SPSC_QUEUE_LIBRARY_H

src/data.c

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@
77

88
#include <assert.h>
99
#include <stdlib.h>
10-
#include <stdbool.h>
1110

11+
#include "hash_table.h"
1212
#include "memory.h"
1313

14-
#define malloc(size) tracked_malloc(size)
15-
#define realloc(ptr, size) tracked_realloc(ptr, size)
14+
#define malloc(size) malloc(size)
15+
#define realloc(ptr, size) realloc(ptr, size)
1616

1717
void join_triple_copy(const data_t *src1, const uint32_t index1,
1818
const data_t *src2, const uint32_t index2, data_t *dest)
@@ -32,6 +32,27 @@ void join_triple_copy(const data_t *src1, const uint32_t index1,
3232
}
3333

3434

35+
void join_bucket_copy(const data_t *src1, bucket_t *bucket,
36+
const data_t *src2, uint32_t index2, data_t *dest)
37+
{
38+
uint32_t index = dest->size * dest->width;
39+
for (size_t j = 0; j < bucket->count; ++j) {
40+
for (uint32_t i = 0; i < src1->width; ++i) {
41+
//PUSH_TO_BUFFER(dest, index++, el1[i]);
42+
PUSH_TO_BUFFER(dest, index++, bucket->tuples[j][i]);
43+
//dest->data[index++] = src1->data[index1 + i];
44+
}
45+
46+
for (uint32_t i = 0; i < src2->width; ++i) {
47+
PUSH_TO_BUFFER(dest, index++, src2->data[index2 + i]);
48+
//dest->data[index++] = src2->data[index2 + i];
49+
}
50+
51+
dest->size++;
52+
}
53+
}
54+
55+
3556
bool join_check(const data_t *src1, const uint32_t index1,
3657
const data_t *src2, const uint32_t index2, const join_params_t check)
3758
{

src/hash_table.c

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
//
2+
// Created by Seppe Degryse on 15/04/2025.
3+
//
4+
#include "hash_table.h"
5+
6+
#include <assert.h>
7+
8+
9+
uint32_t hash(const uint32_t key, const size_t table_size)
10+
{
11+
return key % table_size;
12+
}
13+
14+
15+
hash_table_t create_table(const size_t num_elements)
16+
{
17+
const size_t size = num_elements * 2; // double for lower load factor
18+
hash_table_t ht;
19+
ht.size = size;
20+
21+
// Allocate array of buckets
22+
ht.table = (bucket_t *) calloc(size, sizeof(bucket_t));
23+
if (ht.table == NULL) {
24+
fprintf(stderr, "Failed to allocate hash table\n");
25+
exit(EXIT_FAILURE);
26+
}
27+
28+
return ht;
29+
}
30+
31+
32+
void free_table(const hash_table_t *ht)
33+
{
34+
for (size_t i = 0; i < ht->size; i++) {
35+
free(ht->table[i].tuples);
36+
}
37+
free(ht->table);
38+
}
39+
40+
41+
bool bucket_insert(bucket_t *bucket, triple_t *el) {
42+
if (bucket->count == bucket->capacity) {
43+
size_t new_capacity = bucket->capacity == 0 ? 4 : bucket->capacity * 2;
44+
triple_t **new_tuples = realloc(bucket->tuples, new_capacity * sizeof(triple_t *));
45+
if (new_tuples == NULL) return false;
46+
47+
bucket->tuples = new_tuples;
48+
bucket->capacity = new_capacity;
49+
}
50+
51+
bucket->tuples[bucket->count++] = el;
52+
return true;
53+
}
54+
55+
56+
bool triple_offset_equal(const uint32_t offset, triple_t *el, uint32_t key)
57+
{
58+
return *((uint32_t *) el + offset) == key;
59+
}
60+
61+
62+
bool insert(hash_table_t *ht, const uint32_t offset, triple_t *el) {
63+
uint32_t key = *((uint32_t *)el + offset);
64+
size_t idx = hash(key, ht->size);
65+
66+
for (size_t i = 0; i < ht->size; i++) {
67+
size_t probe_idx = (idx + i) % ht->size;
68+
bucket_t *bucket = &ht->table[probe_idx];
69+
70+
// Empty bucket: initialize it
71+
if (bucket->tuples == NULL) {
72+
bucket->tuples = malloc(4 * sizeof(triple_t *));
73+
if (bucket->tuples == NULL) return false;
74+
bucket->capacity = 4;
75+
bucket->count = 0;
76+
}
77+
78+
// Check if the key in this bucket matches or it's the first insert
79+
if (bucket->count == 0 || triple_offset_equal(offset, bucket->tuples[0], key)) {
80+
return bucket_insert(bucket, el);
81+
}
82+
}
83+
84+
return false; // Table full
85+
}
86+
87+
88+
bucket_t *contains(const hash_table_t *ht, const uint32_t offset_in1, const uint32_t offset_in2, triple_t *el) {
89+
uint32_t key = *((uint32_t *)el + offset_in2);
90+
size_t idx = hash(key, ht->size);
91+
92+
for (size_t i = 0; i < ht->size; i++) {
93+
size_t probe_idx = (idx + i) % ht->size;
94+
bucket_t *bucket = &ht->table[probe_idx];
95+
96+
if (bucket->tuples == NULL)
97+
return NULL;
98+
99+
if (bucket->count > 0 && triple_offset_equal(offset_in1, bucket->tuples[0], key))
100+
return bucket;
101+
}
102+
103+
return NULL;
104+
}

src/memory.c

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,33 +6,55 @@
66

77
static size_t total_allocated = 0;
88
static size_t allocation_count = 0;
9+
static size_t current = 0;
10+
static size_t peak_allocated = 0;
911

1012

11-
void* tracked_malloc(size_t size) {
12-
void* ptr = malloc(size);
13+
void *tracked_malloc(size_t size)
14+
{
15+
void *ptr = malloc(size);
1316
if (ptr) {
1417
total_allocated += size;
18+
current += size;
19+
if (current > peak_allocated) peak_allocated = current;
1520
allocation_count++;
1621
}
1722
return ptr;
1823
}
1924

20-
void* tracked_realloc(void* ptr, size_t new_size) {
25+
26+
void *tracked_realloc(void *ptr, size_t new_size)
27+
{
2128
total_allocated -= (new_size / 2);
29+
current -= (new_size/2);
2230

23-
void* new_ptr = realloc(ptr, new_size);
31+
void *new_ptr = realloc(ptr, new_size);
2432
if (new_ptr) {
2533
total_allocated += new_size;
34+
current += new_size;
35+
if (current > peak_allocated) peak_allocated = current;
2636
allocation_count++;
2737
}
2838
return new_ptr;
2939
}
3040

31-
size_t get_alloc_count() {return allocation_count;}
32-
size_t get_total_allocated() {return total_allocated;}
41+
42+
void tracked_free(void *ptr, const size_t size)
43+
{
44+
free(ptr);
45+
current -= size;
46+
}
47+
48+
49+
size_t get_alloc_count() { return allocation_count; }
50+
size_t get_total_allocated() { return total_allocated; }
51+
size_t get_peak_allocated() { return peak_allocated; }
52+
3353

3454
void reset_memory_counter()
3555
{
3656
total_allocated = 0;
3757
allocation_count = 0;
38-
}
58+
peak_allocated = 0;
59+
current = 0;
60+
}

0 commit comments

Comments
 (0)