diff --git a/c/opendht.cpp b/c/opendht.cpp index 282bc5d97..538fa78fa 100644 --- a/c/opendht.cpp +++ b/c/opendht.cpp @@ -298,6 +298,24 @@ void dht_runner_bootstrap(dht_runner* r, const char* host, const char* service) runner->bootstrap(host); } +void dht_runner_bootstrap2(dht_runner* r, struct sockaddr *addrs[], socklen_t addrs_len[], dht_done_cb done_cb, void* cb_user_data) { + auto runner = reinterpret_cast(r); + + std::vector sa; + + size_t i = 0; + while(addrs != nullptr && addrs[i] != nullptr) { + sa.push_back(dht::SockAddr(addrs[i], addrs_len[i])); + i++; + } + + auto cb = [done_cb, cb_user_data](bool success) { + done_cb(success, cb_user_data); + }; + + runner->bootstrap(sa, cb); +} + void dht_runner_get(dht_runner* r, const dht_infohash* h, dht_get_cb cb, dht_done_cb done_cb, void* cb_user_data) { auto runner = reinterpret_cast(r); auto hash = reinterpret_cast(h); diff --git a/c/opendht_c.h b/c/opendht_c.h index be856cbb4..c27578300 100644 --- a/c/opendht_c.h +++ b/c/opendht_c.h @@ -94,6 +94,7 @@ OPENDHT_C_PUBLIC const char* dht_value_get_user_type(const dht_value* data); typedef bool (*dht_get_cb)(const dht_value* value, void* user_data); typedef bool (*dht_value_cb)(const dht_value* value, bool expired, void* user_data); typedef void (*dht_done_cb)(bool ok, void* user_data); +typedef void (*dht_bootstrap_cb)(bool ok); typedef void (*dht_shutdown_cb)(void* user_data); struct OPENDHT_C_PUBLIC dht_op_token; @@ -139,6 +140,7 @@ OPENDHT_C_PUBLIC void dht_runner_run(dht_runner* runner, in_port_t port); OPENDHT_C_PUBLIC void dht_runner_run_config(dht_runner* runner, in_port_t port, const dht_runner_config* config); OPENDHT_C_PUBLIC void dht_runner_ping(dht_runner* runner, struct sockaddr* addr, socklen_t addr_len); OPENDHT_C_PUBLIC void dht_runner_bootstrap(dht_runner* runner, const char* host, const char* service); +OPENDHT_C_PUBLIC void dht_runner_bootstrap2(dht_runner* r, struct sockaddr *addrs[], socklen_t addrs_len[], dht_done_cb done_cb, void* cb_user_data); OPENDHT_C_PUBLIC void dht_runner_get(dht_runner* runner, const dht_infohash* hash, dht_get_cb cb, dht_done_cb done_cb, void* cb_user_data); OPENDHT_C_PUBLIC dht_op_token* dht_runner_listen(dht_runner* runner, const dht_infohash* hash, dht_value_cb cb, dht_shutdown_cb done_cb, void* cb_user_data); OPENDHT_C_PUBLIC void dht_runner_cancel_listen(dht_runner* runner, const dht_infohash* hash, dht_op_token* token); diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 666c8c142..b479c80d7 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -8,4 +8,5 @@ edition = "2018" [dependencies] libc="0.2.0" -os_socketaddr="0.1.0" \ No newline at end of file +os_socketaddr="0.1.0" +futures="0.3.4" diff --git a/rust/examples/dhtnode_async.rs b/rust/examples/dhtnode_async.rs new file mode 100644 index 000000000..d8290b61e --- /dev/null +++ b/rust/examples/dhtnode_async.rs @@ -0,0 +1,69 @@ +/* + * Copyright (C) 2014-2020 Savoir-faire Linux Inc. + * Author: Sébastien Blin + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +extern crate opendht; +use std::{thread, time}; + +use opendht::{ InfoHash, DhtRunner, DhtRunnerConfig, Value }; +// use opendht::crypto::*; +use futures::prelude::*; + +fn main() { + println!("{}", InfoHash::random()); + println!("{}", InfoHash::new()); + println!("{}", InfoHash::new().is_zero()); + println!("{}", InfoHash::get("alice")); + println!("{}", InfoHash::get("alice").is_zero()); + + + let mut dht = DhtRunner::new(); + let /*mut*/ config = DhtRunnerConfig::new(); + //// If you want to inject a certificate, uncomment the following lines and previous mut. + //// Note: you can generate a certificate with + //// openssl req -x509 -newkey rsa:4096 -sha256 -days 3650 -nodes -keyout example.key -out example.crt -subj /CN=example.com + //let cert = DhtCertificate::import("example.crt").ok().expect("Invalid cert file"); + //let pk = PrivateKey::import("example.key", ""); + //config.set_identity(cert, pk); + dht.run_config(1412, config); + use std::net::ToSocketAddrs; + let addrs = "bootstrap.jami.net:4222".to_socket_addrs().unwrap(); + + futures::executor::block_on(async { + let r = dht.bootstrap_async(addrs).await; + + println!("Current node id: {}", dht.node_id()); + + let mut stream = dht.get_async(&InfoHash::get("bob")); + + while let Ok(Some(value)) = stream.try_next().await { + println!("GOT: VALUE - value: {}", value); + } + + dht.put_async(&InfoHash::get("bob"), Value::new("hi!"), false).await; + + println!("Start listening /foo (sleep 10s)"); + let mut stream = dht.listen_async(&InfoHash::get("foo")); + let one_min = time::Duration::from_secs(10); + thread::sleep(one_min); + while let Some((v, expired)) = stream.next().await { + println!("LISTEN: DONE CB - v: {} - expired: {}", v, expired); + } + }); + + println!("Public ips: {:#?}", dht.public_addresses()); +} \ No newline at end of file diff --git a/rust/src/dhtrunner.rs b/rust/src/dhtrunner.rs index 12d31fe6a..0a5dc4e7f 100644 --- a/rust/src/dhtrunner.rs +++ b/rust/src/dhtrunner.rs @@ -25,6 +25,8 @@ use std::ptr; pub use crate::ffi::*; use std::net::SocketAddr; use os_socketaddr::OsSocketAddr; +use futures::prelude::*; +use futures::channel::mpsc; impl DhtRunnerConfig { @@ -114,6 +116,17 @@ extern fn get_handler_cb(v: *mut Value, ptr: *mut c_void) -> bool { } } +extern fn get_async_handler_cb(v: *mut Value, ptr: *mut c_void) -> bool { + if ptr.is_null() { + return true; + } + let f = unsafe { + let tx = ptr as *mut mpsc::UnboundedSender>>>; + (*tx).send(Some(Ok((*v).boxed()))) + }; + futures::executor::block_on(f).is_ok() +} + extern fn done_handler_cb(ok: bool, ptr: *mut c_void) { unsafe { let handler = Box::from_raw(ptr as *mut GetHandler); @@ -121,6 +134,31 @@ extern fn done_handler_cb(ok: bool, ptr: *mut c_void) { } } +extern fn done_async_handler_cb(ok: bool, ptr: *mut c_void) { + if ptr.is_null() { + return; + } + + let mut tx = unsafe { + let ptr = ptr as *mut mpsc::UnboundedSender>>>; + Box::from_raw(ptr) + }; + let item = if ok { None } else { Some(Err(std::io::Error::new(std::io::ErrorKind::Other, "get failed"))) }; + let _ = futures::executor::block_on((*tx).send(item)); +} + +extern fn bootstrap_done_async_handler_cb(ok: bool, ptr: *mut c_void) { + if ptr.is_null() { + return; + } + + let tx = unsafe { + let ptr = ptr as *mut futures::channel::oneshot::Sender; + Box::from_raw(ptr) + }; + let _ = (*tx).send(ok); +} + struct PutHandler<'a> { done_cb: &'a mut(dyn FnMut(bool)) @@ -165,6 +203,14 @@ extern fn listen_handler_done(ptr: *mut c_void) { } } +struct VerboseDrop<'a, T>(T, &'a str); + +impl<'a, T> VerboseDrop<'a, T> { + fn drop(&mut self) { + println!("{}", self.1); + } +} + impl DhtRunner { pub fn new() -> Box { unsafe { @@ -187,11 +233,30 @@ impl DhtRunner { pub fn bootstrap(&mut self, host: &str, service: u16) { unsafe { dht_runner_bootstrap(&mut *self, - CString::new(host).unwrap().as_ptr(), - CString::new(service.to_string()).unwrap().as_ptr()) + CString::new(host).unwrap().as_ptr(), + CString::new(service.to_string()).unwrap().as_ptr()) } } + pub async fn bootstrap_async>(&mut self, addrs: A) -> std::io::Result { + let socks: Vec = addrs.map(|a| a.into()).collect(); + let sizes: Vec = socks.iter().map(|s| s.len()).collect(); + + let (tx, rx) = futures::channel::oneshot::channel(); + + let tx = Box::new(tx); + let tx = Box::into_raw(tx) as *mut c_void; + + unsafe { + dht_runner_bootstrap2(&mut *self, socks.as_ptr() as *const *const _, + sizes.as_ptr() as *const *const _, bootstrap_done_async_handler_cb as *mut c_void, tx); + } + + let success = rx.await.expect("bootstrap_async() sender was dropped unexpectedly"); + + Ok(success) + } + pub fn node_id(&self) -> InfoHash { unsafe { dht_runner_get_node_id(&*self) @@ -217,6 +282,19 @@ impl DhtRunner { } } + pub fn get_async(&mut self, h: &InfoHash) + -> impl TryStream, Error=std::io::Error> + Unpin { + let (tx, rx) = mpsc::unbounded(); + let tx = Box::new(tx); + let tx = Box::into_raw(tx) as *mut c_void; + + unsafe { + dht_runner_get(&mut *self, h, get_async_handler_cb, done_async_handler_cb, tx) + } + rx.take_while(|item: &Option<_>| futures::future::ready(item.is_some())) + .filter_map(|item| futures::future::ready(item)) + } + pub fn put<'a>(&mut self, h: &InfoHash, v: Box, done_cb: &'a mut(dyn FnMut(bool)), permanent: bool) { let handler = Box::new(PutHandler { @@ -228,6 +306,21 @@ impl DhtRunner { } } + pub async fn put_async(&mut self, h: &InfoHash, v: Box, permanent: bool) -> bool { + let (tx, rx) = futures::channel::oneshot::channel(); + let mut tx = Some(tx); + + let mut done_cb = move |success| { + if let Some(tx) = tx.take() { + tx.send(success).expect("put_async() receiver was dropped unexpectedly"); + } + }; + + self.put(h, v, &mut done_cb, permanent); + + rx.await.expect("put_async() sender was dropped unexpectedly") + } + pub fn put_signed<'a>(&mut self, h: &InfoHash, v: Box, done_cb: &'a mut(dyn FnMut(bool)), permanent: bool) { let handler = Box::new(PutHandler { @@ -239,6 +332,21 @@ impl DhtRunner { } } + pub async fn put_signed_async(&mut self, h: &InfoHash, v: Box, permanent: bool) -> bool { + let (tx, rx) = futures::channel::oneshot::channel(); + let mut tx = Some(tx); + + let mut done_cb = move |success| { + if let Some(tx) = tx.take() { + tx.send(success).expect("put_signed_async() receiver was dropped unexpectedly"); + } + }; + + self.put_signed(h, v, &mut done_cb, permanent); + + rx.await.expect("put_signed_async() sender was dropped unexpectedly") + } + pub fn put_encrypted<'a>(&mut self, h: &InfoHash, to: &InfoHash, v: Box, done_cb: &'a mut(dyn FnMut(bool)), permanent: bool) { let handler = Box::new(PutHandler { @@ -250,6 +358,22 @@ impl DhtRunner { } } + pub async fn put_encrypted_async(&mut self, h: &InfoHash, to: &InfoHash, v: Box, + permanent: bool) -> bool { + let (tx, rx) = futures::channel::oneshot::channel(); + let mut tx = Some(tx); + + let mut done_cb = move |success| { + if let Some(tx) = tx.take() { + tx.send(success).expect("put_encrypted_async() receiver was dropped unexpectedly"); + } + }; + + self.put_encrypted(h, to, v, &mut done_cb, permanent); + + rx.await.expect("put_encrypted_async() sender was dropped unexpectedly") + } + pub fn cancel_put<'a>(&mut self, h: &InfoHash, vid: u64) { unsafe { dht_runner_cancel_put(&mut *self, h, vid) @@ -267,6 +391,20 @@ impl DhtRunner { } } + pub fn listen_async(&mut self, h: &InfoHash) + -> impl Stream, bool)> + Unpin + { + let (mut tx, rx) = mpsc::unbounded(); + + let mut value_cb = move |v, expired| { + futures::executor::block_on(tx.send((v, expired))).is_ok() + }; + + let _token = self.listen(h, &mut value_cb); + + return Box::pin(rx); + } + pub fn cancel_listen(&mut self, h: &InfoHash, token: Box) { unsafe { dht_runner_cancel_listen(&mut *self, h, &*token) @@ -282,6 +420,16 @@ impl DhtRunner { } } + pub async fn shutdown_async<'a>(&'a mut self) -> bool { + let (tx, rx) = futures::channel::oneshot::channel(); + let tx = Box::new(tx); + let ptr = Box::into_raw(tx) as *mut c_void; + + self.shutdown(done_async_handler_cb, ptr); + + rx.await.expect("shutdown_async() sender was dropped unexpectedly") + } + pub fn public_addresses(&self) -> Vec { let mut result = Vec::new(); unsafe { diff --git a/rust/src/ffi.rs b/rust/src/ffi.rs index c2c36f2ec..4e4cbee74 100644 --- a/rust/src/ffi.rs +++ b/rust/src/ffi.rs @@ -189,6 +189,9 @@ extern { pub fn dht_runner_run(dht: *mut DhtRunner, port: in_port_t); pub fn dht_runner_run_config(dht: *mut DhtRunner, port: in_port_t, config: *const DhtRunnerConfig); pub fn dht_runner_bootstrap(dht: *mut DhtRunner, host: *const c_char, service: *const c_char); + pub fn dht_runner_bootstrap2(dht: *mut DhtRunner, addrs: *const *const OsSocketAddr, + addr_lens: *const *const libc::socklen_t, + done_cb: *mut c_void, cb_user_data: *mut c_void); pub fn dht_runner_get(dht: *mut DhtRunner, h: *const InfoHash, get_cb: extern fn(*mut Value, *mut c_void) -> bool, done_cb: extern fn(bool, *mut c_void),