Skip to content

Commit 3032b92

Browse files
committed
feat: private area intersection
1 parent 4ab46d6 commit 3032b92

File tree

14 files changed

+913
-105
lines changed

14 files changed

+913
-105
lines changed

Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

iroh-willow/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ tokio-util = { version = "0.7", features = ["io-util", "io"] }
4040
tracing = "0.1"
4141
zerocopy = { version = "0.8.0-alpha.9", features = ["derive"] }
4242
hex = "0.4.3"
43+
curve25519-dalek = { version = "4.1.3", features = ["digest", "rand_core", "serde"] }
44+
sha2 = "0.10.8"
4345

4446
[dev-dependencies]
4547
iroh-test = { path = "../iroh-test" }

iroh-willow/src/actor.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -389,13 +389,18 @@ impl<S: Storage> Actor<S> {
389389
} => {
390390
let Channels { send, recv } = channels;
391391
let id = self.next_session_id();
392-
let session = Session::new(id, init.mode, our_role, send, initial_transmission);
392+
let session =
393+
Session::new(&self.store, id, our_role, send, init, initial_transmission);
394+
let session = match session {
395+
Ok(session) => session,
396+
Err(err) => return send_reply(reply, Err(err.into())),
397+
};
393398

394399
let store = self.store.clone();
395400
let cancel_token = CancellationToken::new();
396401

397402
let future = session
398-
.run(store, recv, init, cancel_token.clone())
403+
.run(store, recv, cancel_token.clone())
399404
.instrument(error_span!("session", peer = %peer.fmt_short()));
400405
let task_key = self.session_tasks.spawn_local(id, future);
401406

@@ -481,6 +486,7 @@ impl<S: Storage> Actor<S> {
481486
fn complete_session(&mut self, session_id: &SessionId, result: Result<(), Error>) {
482487
let session = self.sessions.remove(session_id);
483488
if let Some(session) = session {
489+
debug!(?session, ?result, "complete session");
484490
session.on_finish.send(result).ok();
485491
self.session_tasks.remove(&session.task_key);
486492
} else {

iroh-willow/src/auth.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::{
2-
collections::{BTreeSet, HashMap},
2+
collections::{BTreeMap, BTreeSet, HashMap},
33
sync::{Arc, RwLock},
44
};
55

@@ -19,6 +19,8 @@ use crate::{
1919
store::traits::{SecretStorage, SecretStoreError, Storage},
2020
};
2121

22+
pub type InterestMap = BTreeMap<ReadAuthorisation, BTreeSet<AreaOfInterest>>;
23+
2224
#[derive(Debug, Clone)]
2325
pub struct DelegateTo {
2426
pub user: UserId,
@@ -220,7 +222,7 @@ impl<S: Storage> Auth<S> {
220222
pub fn find_read_caps_for_interests(
221223
&self,
222224
interests: Interests,
223-
) -> Result<HashMap<ReadAuthorisation, BTreeSet<AreaOfInterest>>, AuthError> {
225+
) -> Result<InterestMap, AuthError> {
224226
match interests {
225227
Interests::All => {
226228
let out = self
@@ -230,11 +232,12 @@ impl<S: Storage> Auth<S> {
230232
let aoi = AreaOfInterest::new(area);
231233
(auth, BTreeSet::from_iter([aoi]))
232234
})
233-
.collect::<HashMap<_, _>>();
235+
.collect::<BTreeMap<_, _>>();
234236
Ok(out)
235237
}
236238
Interests::Some(interests) => {
237-
let mut out: HashMap<ReadAuthorisation, BTreeSet<AreaOfInterest>> = HashMap::new();
239+
let mut out: BTreeMap<ReadAuthorisation, BTreeSet<AreaOfInterest>> =
240+
BTreeMap::new();
238241
for (cap_selector, aoi_selector) in interests {
239242
let cap = self.get_read_cap(&cap_selector)?;
240243
if let Some(cap) = cap {

iroh-willow/src/net.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,10 @@ impl SessionHandle {
105105
pub async fn join(&mut self) -> anyhow::Result<()> {
106106
let session_res = self.handle.on_finish().await;
107107
let net_tasks_res = join_all(&mut self.tasks).await;
108-
session_res.or(net_tasks_res)
108+
match session_res {
109+
Err(err) => Err(err.into()),
110+
Ok(()) => net_tasks_res,
111+
}
109112
}
110113
}
111114

@@ -232,7 +235,7 @@ async fn recv_loop(mut recv_stream: RecvStream, mut channel_writer: Writer) -> a
232235
let max_buffer_size = channel_writer.max_buffer_size();
233236
while let Some(buf) = recv_stream.read_chunk(max_buffer_size, true).await? {
234237
channel_writer.write_all(&buf.bytes[..]).await?;
235-
trace!(len = buf.bytes.len(), "recv");
238+
// trace!(len = buf.bytes.len(), "recv");
236239
}
237240
channel_writer.close();
238241
trace!("close");
@@ -241,9 +244,9 @@ async fn recv_loop(mut recv_stream: RecvStream, mut channel_writer: Writer) -> a
241244

242245
async fn send_loop(mut send_stream: SendStream, channel_reader: Reader) -> anyhow::Result<()> {
243246
while let Some(data) = channel_reader.read_bytes().await {
244-
let len = data.len();
247+
// let len = data.len();
245248
send_stream.write_chunk(data).await?;
246-
trace!(len, "sent");
249+
// trace!(len, "sent");
247250
}
248251
send_stream.finish().await?;
249252
trace!("close");
@@ -279,7 +282,7 @@ async fn join_all(join_set: &mut JoinSet<anyhow::Result<()>>) -> anyhow::Result<
279282
let mut joined = 0;
280283
while let Some(res) = join_set.join_next().await {
281284
joined += 1;
282-
tracing::trace!("joined {joined} tasks, remaining {}", join_set.len());
285+
trace!("joined {joined} tasks, remaining {}", join_set.len());
283286
let res = match res {
284287
Ok(Ok(())) => Ok(()),
285288
Ok(Err(err)) => Err(err),

iroh-willow/src/proto/sync.rs

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crate::util::codec::{DecodeOutcome, Decoder, Encoder};
1010
use super::{
1111
grouping::{Area, AreaOfInterest, ThreeDRange},
1212
meadowcap,
13-
willow::{Entry, DIGEST_LENGTH},
13+
willow::{Entry, NamespaceId, DIGEST_LENGTH},
1414
};
1515

1616
pub const MAX_PAYLOAD_SIZE_POWER: u8 = 12;
@@ -55,7 +55,7 @@ pub type SyncSignature = meadowcap::UserSignature;
5555
pub type Receiver = meadowcap::UserPublicKey;
5656

5757
/// Represents an authorisation to read an area of data in a Namespace.
58-
#[derive(Debug, Clone, Serialize, Deserialize, Hash, Eq, PartialEq)]
58+
#[derive(Debug, Clone, Serialize, Deserialize, Hash, Eq, PartialEq, Ord, PartialOrd)]
5959
pub struct ReadAuthorisation(pub ReadCapability, pub Option<SubspaceCapability>);
6060

6161
impl From<ReadCapability> for ReadAuthorisation {
@@ -76,6 +76,10 @@ impl ReadAuthorisation {
7676
pub fn subspace_cap(&self) -> Option<&SubspaceCapability> {
7777
self.1.as_ref()
7878
}
79+
80+
pub fn namespace(&self) -> NamespaceId {
81+
self.0.granted_namespace().id()
82+
}
7983
}
8084

8185
/// The different resource handles employed by the WGPS.
@@ -304,7 +308,7 @@ pub enum Message {
304308
#[debug("{:?}", _0)]
305309
PaiRequestSubspaceCapability(PaiRequestSubspaceCapability),
306310
#[debug("{:?}", _0)]
307-
PaiReplySubspaceCapability(PaiReplySubspaceCapability),
311+
PaiReplySubspaceCapability(Box<PaiReplySubspaceCapability>),
308312
#[debug("{:?}", _0)]
309313
SetupBindStaticToken(SetupBindStaticToken),
310314
#[debug("{:?}", _0)]
@@ -873,39 +877,40 @@ pub struct ControlFreeHandle {
873877
handle_type: HandleType,
874878
}
875879

876-
type PsiGroup = ();
880+
pub type PsiGroupBytes = [u8; 32];
881+
877882
/// Bind data to an IntersectionHandle for performing private area intersection.
878883
#[derive(Debug, Serialize, Deserialize)]
879884
pub struct PaiBindFragment {
880885
/// The result of first applying hash_into_group to some fragment for private area intersection and then performing scalar multiplication with scalar.
881-
group_member: PsiGroup,
886+
pub group_member: PsiGroupBytes,
882887
/// Set to true if the private set intersection item is a secondary fragment.
883-
is_secondary: bool,
888+
pub is_secondary: bool,
884889
}
885890

886891
/// Finalise private set intersection for a single item.
887892
#[derive(Debug, Serialize, Deserialize)]
888893
pub struct PaiReplyFragment {
889894
/// The IntersectionHandle of the PaiBindFragment message which this finalises.
890-
handle: IntersectionHandle,
895+
pub handle: IntersectionHandle,
891896
/// The result of performing scalar multiplication between the group_member of the message that this is replying to and scalar.
892-
group_member: PsiGroup,
897+
pub group_member: PsiGroupBytes,
893898
}
894899

895900
/// Ask the receiver to send a SubspaceCapability.
896901
#[derive(Debug, Serialize, Deserialize)]
897902
pub struct PaiRequestSubspaceCapability {
898903
/// The IntersectionHandle bound by the sender for the least-specific secondary fragment for whose NamespaceId to request the SubspaceCapability.
899-
handle: IntersectionHandle,
904+
pub handle: IntersectionHandle,
900905
}
901906

902907
/// Send a previously requested SubspaceCapability.
903908
#[derive(Debug, Serialize, Deserialize)]
904909
pub struct PaiReplySubspaceCapability {
905910
/// The handle of the PaiRequestSubspaceCapability message that this answers (hence, an IntersectionHandle bound by the receiver of this message).
906-
handle: IntersectionHandle,
911+
pub handle: IntersectionHandle,
907912
/// A SubspaceCapability whose granted namespace corresponds to the request this answers.
908-
capability: SubspaceCapability,
913+
pub capability: SubspaceCapability,
909914
/// The SyncSubspaceSignature issued by the receiver of the capability over the sender’s challenge.
910-
signature: SyncSignature,
915+
pub signature: SyncSignature,
911916
}

iroh-willow/src/proto/willow.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,11 @@ impl Path {
9393
Path(path)
9494
}
9595

96+
pub fn from_components(components: &[Component]) -> Self {
97+
let path: Arc<[Component]> = components.to_vec().into();
98+
Self(path)
99+
}
100+
96101
pub fn validate(components: &[&[u8]]) -> Result<(), InvalidPath> {
97102
if components.len() > MAX_COMPONENT_COUNT {
98103
return Err(InvalidPath::TooManyComponents);
@@ -146,6 +151,14 @@ impl Path {
146151
let start = count.min(self.len());
147152
Self::new_unchecked(self[start..].to_vec())
148153
}
154+
155+
pub fn component_count(&self) -> usize {
156+
self.0.len()
157+
}
158+
159+
pub fn components(&self) -> &[Component] {
160+
&self.0
161+
}
149162
}
150163

151164
impl std::ops::Deref for Path {

iroh-willow/src/session.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crate::{
1212
pub mod channels;
1313
mod data;
1414
mod error;
15+
mod pai;
1516
mod payload;
1617
mod reconciler;
1718
mod resource;
@@ -113,7 +114,7 @@ pub enum Scope {
113114

114115
/// Intersection between two areas of interest.
115116
#[derive(Debug, Clone)]
116-
pub struct AreaOfInterestIntersection {
117+
pub struct AoiIntersection {
117118
pub our_handle: AreaOfInterestHandle,
118119
pub their_handle: AreaOfInterestHandle,
119120
pub intersection: Area,

iroh-willow/src/session/error.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::{
66
sync::ResourceHandle,
77
willow::Unauthorised,
88
},
9+
session::{pai::PaiError, resource::MissingResource},
910
store::traits::SecretStoreError,
1011
util::channel::{ReadError, WriteError},
1112
};
@@ -64,6 +65,10 @@ pub enum Error {
6465
MissingUserKey(UserId),
6566
#[error("a task failed to join")]
6667
TaskFailed(#[from] tokio::task::JoinError),
68+
#[error("no known interests for given capability")]
69+
NoKnownInterestsForCapability,
70+
#[error("private area intersection error: {0}")]
71+
Pai(#[from] PaiError),
6772
}
6873

6974
impl From<Unauthorised> for Error {
@@ -88,3 +93,9 @@ impl From<meadowcap::InvalidParams> for Error {
8893
Self::InvalidParameters("")
8994
}
9095
}
96+
97+
impl From<MissingResource> for Error {
98+
fn from(value: MissingResource) -> Self {
99+
Self::MissingResource(value.0)
100+
}
101+
}

0 commit comments

Comments
 (0)