Skip to content

Commit 15a4de8

Browse files
committed
Emit Download and Append events
1 parent 37d2a9c commit 15a4de8

4 files changed

Lines changed: 123 additions & 28 deletions

File tree

src/event.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1-
/// Events emitted.
1+
/// An event emitted by a Feed.
22
#[derive(Debug, Clone, PartialEq)]
3-
pub enum Event {}
3+
pub enum Event {
4+
/// A new block has been appended.
5+
Append,
6+
/// A new block has been downloaded.
7+
Download(u64),
8+
}

src/feed.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
//! Hypercore's main abstraction. Exposes an append-only, secure log structure.
22
3+
use crate::event::Event;
34
use crate::feed_builder::FeedBuilder;
45
use crate::replicate::{Message, Peer};
56
pub use crate::storage::{Node, NodeTrait, Storage, Store};
@@ -12,6 +13,10 @@ use crate::crypto::{
1213
use crate::proof::Proof;
1314
use anyhow::{bail, ensure, Result};
1415
use flat_tree as flat;
16+
use futures::channel::mpsc::{
17+
unbounded as channel, UnboundedReceiver as Receiver, UnboundedSender as Sender,
18+
};
19+
use futures::sink::SinkExt;
1520
use pretty_hash::fmt as pretty_fmt;
1621
use random_access_disk::RandomAccessDisk;
1722
use random_access_memory::RandomAccessMemory;
@@ -72,6 +77,7 @@ where
7277
pub(crate) bitfield: Bitfield,
7378
pub(crate) tree: TreeIndex,
7479
pub(crate) peers: Vec<Peer>,
80+
pub(crate) subscribers: Vec<Sender<Event>>,
7581
}
7682

7783
impl<T> Feed<T>
@@ -163,9 +169,25 @@ where
163169
self.tree.set(tree_index(index));
164170
self.length += 1;
165171

172+
self.emit(Event::Append).await;
173+
166174
Ok(())
167175
}
168176

177+
/// Subscribe to events emitted by this feed.
178+
pub fn subscribe(&mut self) -> Receiver<Event> {
179+
let (send, recv) = channel();
180+
self.subscribers.push(send);
181+
recv
182+
}
183+
184+
/// Emit an event on the feed.
185+
async fn emit(&self, event: Event) {
186+
for mut sender in self.subscribers.iter() {
187+
sender.send(event.clone()).await.unwrap();
188+
}
189+
}
190+
169191
/// Get the block of data at the tip of the feed. This will be the most
170192
/// recently appended block.
171193
#[inline]
@@ -399,7 +421,7 @@ where
399421

400422
if let Some(_data) = data {
401423
if self.bitfield.set(index, true).is_changed() {
402-
// TODO: emit "download" event
424+
self.emit(Event::Download(index)).await;
403425
}
404426
// TODO: check peers.length, call ._announce if peers exist.
405427
}

src/feed_builder.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ where
5656
secret_key: self.secret_key,
5757
storage: self.storage,
5858
peers: vec![],
59+
subscribers: vec![],
5960
})
6061
}
6162
}

tests/feed.rs

Lines changed: 92 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ extern crate random_access_memory as ram;
33
mod common;
44

55
use common::create_feed;
6-
use hypercore::{generate_keypair, Feed, NodeTrait, PublicKey, SecretKey, Storage};
6+
use futures::stream::StreamExt;
7+
use hypercore::{generate_keypair, Event, Feed, NodeTrait, PublicKey, SecretKey, Storage};
78
use random_access_storage::RandomAccess;
89
use std::env::temp_dir;
910
use std::fmt::Debug;
@@ -152,14 +153,8 @@ async fn put() {
152153
async fn put_with_data() {
153154
// Create a writable feed.
154155
let mut a = create_feed(50).await.unwrap();
155-
156156
// Create a second feed with the first feed's key.
157-
let (public, secret) = copy_keys(&a);
158-
let storage = Storage::new_memory().await.unwrap();
159-
let mut b = Feed::builder(public, storage)
160-
.secret_key(secret)
161-
.build()
162-
.unwrap();
157+
let mut b = create_clone(&a).await.unwrap();
163158

164159
// Append 4 blocks of data to the writable feed.
165160
a.append(b"hi").await.unwrap();
@@ -217,23 +212,6 @@ async fn create_with_stored_keys() {
217212
);
218213
}
219214

220-
fn copy_keys(
221-
feed: &Feed<impl RandomAccess<Error = Box<dyn std::error::Error + Send + Sync>> + Debug + Send>,
222-
) -> (PublicKey, SecretKey) {
223-
match &feed.secret_key() {
224-
Some(secret) => {
225-
let secret = secret.to_bytes();
226-
let public = &feed.public_key().to_bytes();
227-
228-
let public = PublicKey::from_bytes(public).unwrap();
229-
let secret = SecretKey::from_bytes(&secret).unwrap();
230-
231-
(public, secret)
232-
}
233-
_ => panic!("<tests/common>: Could not access secret key"),
234-
}
235-
}
236-
237215
#[async_std::test]
238216
async fn audit() {
239217
let mut feed = create_feed(50).await.unwrap();
@@ -298,3 +276,92 @@ async fn audit_bad_data() {
298276
}
299277
}
300278
}
279+
280+
#[async_std::test]
281+
async fn events_append() {
282+
let mut feed = create_feed(50).await.unwrap();
283+
let event_task = collect_events(&mut feed, 3);
284+
feed.append(br#"one"#).await.unwrap();
285+
feed.append(br#"two"#).await.unwrap();
286+
feed.append(br#"three"#).await.unwrap();
287+
288+
let event_list = event_task.await;
289+
let mut expected = vec![];
290+
for _i in 0..3 {
291+
expected.push(Event::Append);
292+
}
293+
assert_eq!(event_list, expected, "Correct events emitted")
294+
}
295+
296+
#[async_std::test]
297+
async fn events_download() {
298+
let mut a = create_feed(50).await.unwrap();
299+
// Create a second feed with the first feed's key.
300+
let mut b = create_clone(&a).await.unwrap();
301+
302+
let event_task = collect_events(&mut b, 3);
303+
304+
a.append(b"one").await.unwrap();
305+
a.append(b"two").await.unwrap();
306+
a.append(b"three").await.unwrap();
307+
308+
for i in 0..3 {
309+
let a_proof = a.proof(i, false).await.unwrap();
310+
let a_data = a.get(i).await.unwrap();
311+
b.put(i, a_data.as_deref(), a_proof).await.unwrap();
312+
}
313+
314+
let event_list = event_task.await;
315+
316+
let mut expected = vec![];
317+
for i in 0..3 {
318+
expected.push(Event::Download(i));
319+
}
320+
assert_eq!(event_list, expected, "Correct events emitted")
321+
}
322+
323+
async fn create_clone(
324+
feed: &Feed<impl RandomAccess<Error = Box<dyn std::error::Error + Send + Sync>> + Debug + Send>,
325+
) -> Result<Feed<ram::RandomAccessMemory>, anyhow::Error> {
326+
let (public, secret) = copy_keys(&feed);
327+
let storage = Storage::new_memory().await?;
328+
let clone = Feed::builder(public, storage).secret_key(secret).build()?;
329+
Ok(clone)
330+
}
331+
332+
fn copy_keys(
333+
feed: &Feed<impl RandomAccess<Error = Box<dyn std::error::Error + Send + Sync>> + Debug + Send>,
334+
) -> (PublicKey, SecretKey) {
335+
match &feed.secret_key() {
336+
Some(secret) => {
337+
let secret = secret.to_bytes();
338+
let public = &feed.public_key().to_bytes();
339+
340+
let public = PublicKey::from_bytes(public).unwrap();
341+
let secret = SecretKey::from_bytes(&secret).unwrap();
342+
343+
(public, secret)
344+
}
345+
_ => panic!("<tests/common>: Could not access secret key"),
346+
}
347+
}
348+
349+
fn collect_events(
350+
feed: &mut Feed<
351+
impl RandomAccess<Error = Box<dyn std::error::Error + Send + Sync>> + Debug + Send,
352+
>,
353+
n: usize,
354+
) -> async_std::task::JoinHandle<Vec<Event>> {
355+
let mut events = feed.subscribe();
356+
let event_task = async_std::task::spawn(async move {
357+
let mut event_list = vec![];
358+
while let Some(event) = events.next().await {
359+
event_list.push(event);
360+
if event_list.len() == n {
361+
return event_list;
362+
}
363+
}
364+
event_list
365+
});
366+
event_task
367+
}

0 commit comments

Comments
 (0)