Skip to content

Commit 4560380

Browse files
committed
Added Citus example [skip ci]
1 parent cc7e187 commit 4560380

3 files changed

Lines changed: 83 additions & 0 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ Or check out some examples:
1919
- [Sparse search](examples/sparse/example.php) with Text Embeddings Inference
2020
- [Morgan fingerprints](examples/rdkit/example.php) with RDKit
2121
- [Recommendations](examples/disco/example.php) with Disco
22+
- [Horizontal scaling](examples/citus/example.php) with Citus
2223
- [Bulk loading](examples/loading/example.php) with `COPY`
2324

2425
### Laravel

examples/citus/composer.json

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
{
2+
"repositories": [
3+
{
4+
"type": "path",
5+
"url": "../.."
6+
}
7+
],
8+
"require": {
9+
"pgvector/pgvector": "dev-master"
10+
}
11+
}

examples/citus/example.php

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
<?php
2+
3+
require_once __DIR__ . '/vendor/autoload.php';
4+
5+
use Pgvector\Vector;
6+
7+
ini_set('memory_limit', '512M');
8+
9+
// generate random data
10+
$rows = 100000;
11+
$dimensions = 128;
12+
$embeddings = [];
13+
$categories = [];
14+
for ($i = 0; $i < $rows; $i++) {
15+
$embedding = [];
16+
for ($j = 0; $j < $dimensions; $j++) {
17+
$embedding[] = rand() / getrandmax();
18+
}
19+
$embeddings[] = $embedding;
20+
$categories[] = rand(1, 100);
21+
}
22+
23+
// enable extensions
24+
$db = pg_connect('postgres://localhost/pgvector_citus');
25+
pg_query($db, 'CREATE EXTENSION IF NOT EXISTS citus');
26+
pg_query($db, 'CREATE EXTENSION IF NOT EXISTS vector');
27+
28+
// GUC variables set on the session do not propagate to Citus workers
29+
// https://github.com/citusdata/citus/issues/462
30+
// you can either:
31+
// 1. set them on the system, user, or database and reconnect
32+
// 2. set them for a transaction with SET LOCAL
33+
pg_query($db, "ALTER DATABASE pgvector_citus SET maintenance_work_mem = '512MB'");
34+
pg_query($db, 'ALTER DATABASE pgvector_citus SET hnsw.ef_search = 20');
35+
pg_close($db);
36+
37+
// reconnect for updated GUC variables to take effect
38+
$db = pg_connect('postgres://localhost/pgvector_citus');
39+
40+
echo "Creating distributed table\n";
41+
pg_query($db, 'DROP TABLE IF EXISTS items');
42+
pg_query($db, 'CREATE TABLE items (id bigserial, embedding vector(128), category_id bigint, PRIMARY KEY (id, category_id))');
43+
pg_query($db, 'SET citus.shard_count = 4');
44+
pg_query($db, "SELECT create_distributed_table('items', 'category_id')");
45+
46+
echo "Loading data in parallel\n";
47+
48+
pg_query($db, 'COPY items (embedding, category_id) FROM STDIN');
49+
foreach ($embeddings as $i => $e) {
50+
$row = [new Vector($e), $categories[$i]];
51+
$line = join("\t", array_map(fn ($v) => pg_escape_string($db, $v), $row)) . "\n";
52+
pg_put_line($db, $line);
53+
}
54+
pg_put_line($db, "\\.\n");
55+
pg_end_copy($db);
56+
57+
echo "Creating index in parallel\n";
58+
pg_query($db, 'CREATE INDEX ON items USING hnsw (embedding vector_l2_ops)');
59+
60+
echo "Running distributed queries\n";
61+
for ($i = 0; $i < 10; $i++) {
62+
$result = pg_query_params($db, 'SELECT id FROM items ORDER BY embedding <-> $1 LIMIT 10', [new Vector($embeddings[rand(0, $rows - 1)])]);
63+
$ids = [];
64+
while ($row = pg_fetch_array($result)) {
65+
$ids[] = $row['id'];
66+
}
67+
echo join(', ', $ids) . "\n";
68+
pg_free_result($result);
69+
}
70+
71+
pg_close($db);

0 commit comments

Comments
 (0)