Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add pre_resolve_list #66

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion packages/binding/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ struct JsDownloadOptions {
pub entry_listener: Option<JsFunction>,
pub retry_time: Option<u32>,
pub toc_path: Option<JsTocPath>,
pub registries: Option<Vec<String>>,
}

#[napi(object)]
Expand Down Expand Up @@ -118,6 +119,12 @@ fn parse_download_options(
None
};

let registries = if let Some(registries) = options.registries.take() {
Some(registries)
} else {
None
};

Ok(DownloadOptions {
download_dir: options.download_dir,
bucket_count: options.bucket_count as u8,
Expand All @@ -126,6 +133,7 @@ fn parse_download_options(
entry_listener,
retry_time: retry_time as u8,
toc_path,
registries,
})
}

Expand Down Expand Up @@ -276,7 +284,11 @@ impl JsDownloader {
if self.inner.is_some() {
return Ok(());
}
let http_pool = HTTPPool::new(self.options.http_concurrent_count).map_err(|e| {
let http_pool = HTTPPool::new(
self.options.http_concurrent_count,
self.options.registries.clone(),
)
.map_err(|e| {
Error::new(
Status::FunctionExpected,
format!("create reqwester failed: {:?}", e),
Expand Down
1 change: 1 addition & 0 deletions packages/cli/lib/download_dependency.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ async function download(options) {
const downloader = new Downloader({
entryListener,
productionMode: options.productionMode,
registries: options.registries,
});
await downloader.init();

Expand Down
3 changes: 3 additions & 0 deletions packages/cli/lib/downloader.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class Downloader {
* @param {NodeJS.Architecture} [options.arch] -
* @param {boolean} [options.productionMode] -
* @param {{function(*)}} [options.entryListener] -
* @param {Array<string>} [options.registries] -
*/
constructor(options) {
this.entryListener = options.entryListener;
Expand All @@ -30,6 +31,7 @@ class Downloader {
this.rapidDownloader = this.createRapidDownloader();
this.taskMap = new Map();
this._dumpData = null;
this.registries = options.registries;
}

async init() {
Expand Down Expand Up @@ -58,6 +60,7 @@ class Downloader {
map: npmCacheConfigPath,
index: npmIndexConfigPath,
},
registries: this.registries,
});
}

Expand Down
2 changes: 1 addition & 1 deletion packages/downloader/src/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ mod test {
bucket_path: &str,
entry_listener: Option<EntryListener>,
) -> Downloader {
let http_pool = HTTPPool::new(1).expect("create http pool failed");
let http_pool = HTTPPool::new(1, None).expect("create http pool failed");
let store = NpmStore::new(
1,
Path::new(bucket_path),
Expand Down
7 changes: 4 additions & 3 deletions packages/downloader/src/http/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,15 @@ impl Executor for HTTPReqwester {
}

impl HTTPPool {
pub fn new(max_concurrent: u8) -> ProjectResult<HTTPPool> {
pub fn new(max_concurrent: u8, registries: Option<Vec<String>>) -> ProjectResult<HTTPPool> {
let client_builder = reqwest::ClientBuilder::new()
.tcp_keepalive(Duration::from_secs(60))
.connection_verbose(true)
.redirect(reqwest::redirect::Policy::limited(10))
.http1_only()
.use_rustls_tls();
let client_builder = HTTPReqwester::prepare_dns_resolve(client_builder)?;
let domains = registries.unwrap_or(vec!["registry.npmmirror.com".to_owned()]);
let client_builder = HTTPReqwester::prepare_dns_resolve(client_builder, domains)?;
let client = client_builder.build()?;
let client = Arc::new(client);
let mut reqwesters = Vec::with_capacity(max_concurrent as usize);
Expand Down Expand Up @@ -101,7 +102,7 @@ mod test {

#[tokio::test]
async fn test_pool() {
let pool = HTTPPool::new(2).unwrap();
let pool = HTTPPool::new(2, None).unwrap();
let (sx, mut rx) = mpsc::channel::<DownloadResponse>(2);
let download_handler = tokio::spawn(async move {
while let Some(mut response) = rx.recv().await {
Expand Down
11 changes: 6 additions & 5 deletions packages/downloader/src/http/reqwester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@ pub struct HTTPReqwester {
}

impl HTTPReqwester {
pub fn new() -> TnpmResult<Self> {
pub fn new(registries: Option<Vec<String>>) -> TnpmResult<Self> {
let client_builder = reqwest::ClientBuilder::new()
.tcp_keepalive(Duration::from_secs(60))
.connection_verbose(true)
.redirect(reqwest::redirect::Policy::limited(10))
.http1_only()
.use_rustls_tls();

let client_builder = HTTPReqwester::prepare_dns_resolve(client_builder)?;
let domains = registries.unwrap_or(vec!["registry.npmmirror.com".to_owned()]);
let client_builder = HTTPReqwester::prepare_dns_resolve(client_builder, domains)?;
let client = client_builder.build()?;
Ok(HTTPReqwester {
client: Arc::new(client),
Expand All @@ -43,9 +44,9 @@ impl HTTPReqwester {

pub(crate) fn prepare_dns_resolve(
mut client_builder: reqwest::ClientBuilder,
pre_resolve_list: Vec<String>,
) -> Result<reqwest::ClientBuilder, IoError> {
// 在大量连接建立时,容易发生 DNS 超时
let pre_resolve_list = vec!["registry.npmmirror.com"];
let mut client_builder = client_builder;
for address in pre_resolve_list {
let address_with_port = format!("{}:443", address);
Expand All @@ -56,7 +57,7 @@ impl HTTPReqwester {
format!("not found address for {}", address),
)
})?;
client_builder = client_builder.resolve(address, socket_addr);
client_builder = client_builder.resolve(&address, socket_addr);
}
Ok(client_builder)
}
Expand Down Expand Up @@ -100,7 +101,7 @@ mod test {

#[tokio::test]
async fn test_download() {
let req = HTTPReqwester::new().unwrap();
let req = HTTPReqwester::new(None).unwrap();
let stream = req
.request(
PackageRequestBuilder::new()
Expand Down
4 changes: 3 additions & 1 deletion packages/downloader/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub struct DownloadOptions {
pub entry_listener: Option<EntryListener>,
pub download_timeout: Duration,
pub toc_path: Option<TocPath>,
pub registries: Option<Vec<String>>,
}

pub async fn download(
Expand All @@ -70,7 +71,7 @@ pub async fn download(
entry_listener,
)
.await?;
let http_pool = HTTPPool::new(http_concurrent_count)?;
let http_pool = HTTPPool::new(http_concurrent_count, opts.registries)?;
let toc_index_store = Arc::new(TocIndexStore::new());
let mut downloader = Downloader::new(store, http_pool, toc_index_store.clone(), retry_time);
downloader.batch_download(pkg_requests).await?;
Expand Down Expand Up @@ -110,6 +111,7 @@ mod test_downloader {
entry_listener: None,
retry_time: 1,
toc_path: None,
registries: None,
},
)
.await
Expand Down
4 changes: 2 additions & 2 deletions packages/downloader/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use tokio::task::JoinHandle;
async fn black_hole_download() {
let lock = get_chair_lock();
let pkg_requests = lock.list_all_packages();
let pool = HTTPPool::new(50).unwrap();
let pool = HTTPPool::new(50, None).unwrap();
let bucket = 50;
let (sx, rx_list) = subscriber(bucket);
let handlers: Vec<JoinHandle<()>> = rx_list
Expand Down Expand Up @@ -61,7 +61,7 @@ async fn file_download() {
}
});

let http_pool = HTTPPool::new(bucket_size * 2).unwrap();
let http_pool = HTTPPool::new(bucket_size * 2, None).unwrap();
let store = NpmStore::new(
bucket_size,
download_dir.as_path(),
Expand Down
Loading