Skip to content

Commit

Permalink
ocicrypt-rs/image-rs: make ttrpc keyprovider client sync
Browse files Browse the repository at this point in the history
The original ttrpc keyprovider client is marked `async`, but we use it
as sync version by fork a new thread and create a new tokio runtime.
This commit changes the client into sync version, this will match the
original usage.

Why we need this commit is the original fork-thread-and-tokio-runtime
way would create a dead lock for CDH's image-pull API when an encrypted
image is pulled. When we bring in a tokio::spawn_blocking in image-rs,
the tokio runtime would have possibility to switch context thus a dead
lock can be avoided.

Signed-off-by: Xynnn007 <xynnn@linux.alibaba.com>
  • Loading branch information
Xynnn007 committed Aug 15, 2024
1 parent e5546cb commit 4f9c7ca
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 72 deletions.
12 changes: 9 additions & 3 deletions image-rs/src/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,16 @@ impl<'a> PullClient<'a> {
};

let decryptor = Decryptor::from_media_type(&layer.media_type);
let decryptor_i = decryptor.clone();
let layer_i = layer.clone();
let decrypt_config_i = decrypt_config.as_ref().map(|inner| inner.to_string());
if decryptor.is_encrypted() {
let decrypt_key = decryptor
.get_decrypt_key(&layer, decrypt_config)
.map_err(|e| anyhow!("failed to get decrypt key {}", e.to_string()))?;
let decrypt_key = tokio::task::spawn_blocking(move || {
decryptor_i
.get_decrypt_key(&layer_i, &decrypt_config_i.as_deref())
.map_err(|e| anyhow!("failed to get decrypt key {}", e.to_string()))
})
.await??;
let plaintext_layer = decryptor
.async_get_plaintext_layer(layer_reader, &layer, &decrypt_key)
.map_err(|e| anyhow!("failed to async_get_plaintext_layer: {:?}", e))?;
Expand Down
2 changes: 1 addition & 1 deletion ocicrypt-rs/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.include("src/utils")
.rust_protobuf()
.customize(ttrpc_codegen::Customize {
async_all: true,
async_all: false,
..Default::default()
})
.rust_protobuf_customize(ttrpc_codegen::ProtobufCustomize::default().gen_mod_rs(false))
Expand Down
47 changes: 7 additions & 40 deletions ocicrypt-rs/src/keywrap/keyprovider/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use std::collections::HashMap;
use std::fmt::{self, Debug};

use anyhow::{anyhow, bail, Result};
use anyhow::{anyhow, bail, Context, Result};

Check failure on line 7 in ocicrypt-rs/src/keywrap/keyprovider/mod.rs

View workflow job for this annotation

GitHub Actions / Check (stable, s390x)

unused import: `Context`

Check failure on line 7 in ocicrypt-rs/src/keywrap/keyprovider/mod.rs

View workflow job for this annotation

GitHub Actions / Check (1.76.0, s390x)

unused import: `Context`

Check failure on line 7 in ocicrypt-rs/src/keywrap/keyprovider/mod.rs

View workflow job for this annotation

GitHub Actions / Check (stable, ubuntu-latest)

unused import: `Context`

Check failure on line 7 in ocicrypt-rs/src/keywrap/keyprovider/mod.rs

View workflow job for this annotation

GitHub Actions / Check (1.76.0, ubuntu-latest)

unused import: `Context`
use base64::Engine;
use serde::Serialize;

Expand Down Expand Up @@ -325,28 +325,9 @@ impl KeyProviderKeyWrapper {
#[cfg(feature = "keywrap-keyprovider-ttrpc")]
{
let ttrpc = ttrpc.to_string();
let handler = std::thread::spawn(move || {
create_async_runtime()?.block_on(async {
KeyProviderKeyWrapProtocolOutput::from_ttrpc(_input, &ttrpc, OpKey::Wrap)
.map_err(|e| format!("{e}"))
})
});
let protocol_output = match handler.join() {
Ok(Ok(v)) => v,
Ok(Err(e)) => {
return Err(anyhow!(
"keyprovider: ttrpc provider failed to execute {} operation: {}",
OpKey::Wrap,
e
));
}
Err(e) => {
return Err(anyhow!(
"keyprovider: ttrpc provider failed to execute {} operation: {e:?}",
OpKey::Wrap,
));
}
};
let protocol_output =
KeyProviderKeyWrapProtocolOutput::from_ttrpc(_input, &ttrpc, OpKey::Wrap)
.context("keyprovider: ttrpc provider failed to execute Wrap")?;
if let Some(result) = protocol_output.key_wrap_results {
Ok(result.annotation)
} else {
Expand Down Expand Up @@ -423,22 +404,9 @@ impl KeyProviderKeyWrapper {
#[cfg(feature = "keywrap-keyprovider-ttrpc")]
{
let ttrpc = ttrpc.to_string();
let handler = std::thread::spawn(move || {
create_async_runtime()?.block_on(async {
KeyProviderKeyWrapProtocolOutput::from_ttrpc(_input, &ttrpc, OpKey::Unwrap)
.map_err(|e| {
format!(
"keyprovider: ttrpc provider failed to execute {} operation: {e}",
OpKey::Wrap,
)
})
})
});
match handler.join() {
Ok(Ok(v)) => Ok(v),
Ok(Err(e)) => bail!("failed to unwrap key by ttrpc, {e}"),
Err(e) => bail!("failed to unwrap key by ttrpc, {e:?}"),
}

KeyProviderKeyWrapProtocolOutput::from_ttrpc(_input, &ttrpc, OpKey::Unwrap)
.context("keyprovider: failed to unwrap key by ttrpc")
}
}

Expand Down Expand Up @@ -563,7 +531,6 @@ impl KeyWrapper for KeyProviderKeyWrapper {

#[cfg(any(
feature = "keywrap-keyprovider-grpc",
feature = "keywrap-keyprovider-ttrpc",
feature = "keywrap-keyprovider-native"
))]
fn create_async_runtime() -> std::result::Result<tokio::runtime::Runtime, String> {
Expand Down
53 changes: 25 additions & 28 deletions ocicrypt-rs/src/utils/ttrpc/keyprovider_ttrpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,74 +18,71 @@
use protobuf::{CodedInputStream, CodedOutputStream, Message};
use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;

#[derive(Clone)]
pub struct KeyProviderServiceClient {
client: ::ttrpc::r#async::Client,
client: ::ttrpc::Client,
}

impl KeyProviderServiceClient {
pub fn new(client: ::ttrpc::r#async::Client) -> Self {
pub fn new(client: ::ttrpc::Client) -> Self {
KeyProviderServiceClient {
client,
}
}

pub async fn wrap_key(&self, ctx: ttrpc::context::Context, req: &super::keyprovider::KeyProviderKeyWrapProtocolInput) -> ::ttrpc::Result<super::keyprovider::KeyProviderKeyWrapProtocolOutput> {
pub fn wrap_key(&self, ctx: ttrpc::context::Context, req: &super::keyprovider::KeyProviderKeyWrapProtocolInput) -> ::ttrpc::Result<super::keyprovider::KeyProviderKeyWrapProtocolOutput> {
let mut cres = super::keyprovider::KeyProviderKeyWrapProtocolOutput::new();
::ttrpc::async_client_request!(self, ctx, req, "keyprovider.KeyProviderService", "WrapKey", cres);
::ttrpc::client_request!(self, ctx, req, "keyprovider.KeyProviderService", "WrapKey", cres);
Ok(cres)
}

pub async fn un_wrap_key(&self, ctx: ttrpc::context::Context, req: &super::keyprovider::KeyProviderKeyWrapProtocolInput) -> ::ttrpc::Result<super::keyprovider::KeyProviderKeyWrapProtocolOutput> {
pub fn un_wrap_key(&self, ctx: ttrpc::context::Context, req: &super::keyprovider::KeyProviderKeyWrapProtocolInput) -> ::ttrpc::Result<super::keyprovider::KeyProviderKeyWrapProtocolOutput> {
let mut cres = super::keyprovider::KeyProviderKeyWrapProtocolOutput::new();
::ttrpc::async_client_request!(self, ctx, req, "keyprovider.KeyProviderService", "UnWrapKey", cres);
::ttrpc::client_request!(self, ctx, req, "keyprovider.KeyProviderService", "UnWrapKey", cres);
Ok(cres)
}
}

struct WrapKeyMethod {
service: Arc<Box<dyn KeyProviderService + Send + Sync>>,
}

#[async_trait]
impl ::ttrpc::r#async::MethodHandler for WrapKeyMethod {
async fn handler(&self, ctx: ::ttrpc::r#async::TtrpcContext, req: ::ttrpc::Request) -> ::ttrpc::Result<::ttrpc::Response> {
::ttrpc::async_request_handler!(self, ctx, req, keyprovider, KeyProviderKeyWrapProtocolInput, wrap_key);
impl ::ttrpc::MethodHandler for WrapKeyMethod {
fn handler(&self, ctx: ::ttrpc::TtrpcContext, req: ::ttrpc::Request) -> ::ttrpc::Result<()> {
::ttrpc::request_handler!(self, ctx, req, keyprovider, KeyProviderKeyWrapProtocolInput, wrap_key);
Ok(())
}
}

struct UnWrapKeyMethod {
service: Arc<Box<dyn KeyProviderService + Send + Sync>>,
}

#[async_trait]
impl ::ttrpc::r#async::MethodHandler for UnWrapKeyMethod {
async fn handler(&self, ctx: ::ttrpc::r#async::TtrpcContext, req: ::ttrpc::Request) -> ::ttrpc::Result<::ttrpc::Response> {
::ttrpc::async_request_handler!(self, ctx, req, keyprovider, KeyProviderKeyWrapProtocolInput, un_wrap_key);
impl ::ttrpc::MethodHandler for UnWrapKeyMethod {
fn handler(&self, ctx: ::ttrpc::TtrpcContext, req: ::ttrpc::Request) -> ::ttrpc::Result<()> {
::ttrpc::request_handler!(self, ctx, req, keyprovider, KeyProviderKeyWrapProtocolInput, un_wrap_key);
Ok(())
}
}

#[async_trait]
pub trait KeyProviderService: Sync {
async fn wrap_key(&self, _ctx: &::ttrpc::r#async::TtrpcContext, _: super::keyprovider::KeyProviderKeyWrapProtocolInput) -> ::ttrpc::Result<super::keyprovider::KeyProviderKeyWrapProtocolOutput> {
pub trait KeyProviderService {
fn wrap_key(&self, _ctx: &::ttrpc::TtrpcContext, _: super::keyprovider::KeyProviderKeyWrapProtocolInput) -> ::ttrpc::Result<super::keyprovider::KeyProviderKeyWrapProtocolOutput> {
Err(::ttrpc::Error::RpcStatus(::ttrpc::get_status(::ttrpc::Code::NOT_FOUND, "/keyprovider.KeyProviderService/WrapKey is not supported".to_string())))
}
async fn un_wrap_key(&self, _ctx: &::ttrpc::r#async::TtrpcContext, _: super::keyprovider::KeyProviderKeyWrapProtocolInput) -> ::ttrpc::Result<super::keyprovider::KeyProviderKeyWrapProtocolOutput> {
fn un_wrap_key(&self, _ctx: &::ttrpc::TtrpcContext, _: super::keyprovider::KeyProviderKeyWrapProtocolInput) -> ::ttrpc::Result<super::keyprovider::KeyProviderKeyWrapProtocolOutput> {
Err(::ttrpc::Error::RpcStatus(::ttrpc::get_status(::ttrpc::Code::NOT_FOUND, "/keyprovider.KeyProviderService/UnWrapKey is not supported".to_string())))
}
}

pub fn create_key_provider_service(service: Arc<Box<dyn KeyProviderService + Send + Sync>>) -> HashMap<String, ::ttrpc::r#async::Service> {
let mut ret = HashMap::new();
pub fn create_key_provider_service(service: Arc<Box<dyn KeyProviderService + Send + Sync>>) -> HashMap<String, Box<dyn ::ttrpc::MethodHandler + Send + Sync>> {
let mut methods = HashMap::new();
let streams = HashMap::new();

methods.insert("WrapKey".to_string(),
Box::new(WrapKeyMethod{service: service.clone()}) as Box<dyn ::ttrpc::r#async::MethodHandler + Send + Sync>);
methods.insert("/keyprovider.KeyProviderService/WrapKey".to_string(),
Box::new(WrapKeyMethod{service: service.clone()}) as Box<dyn ::ttrpc::MethodHandler + Send + Sync>);

methods.insert("UnWrapKey".to_string(),
Box::new(UnWrapKeyMethod{service: service.clone()}) as Box<dyn ::ttrpc::r#async::MethodHandler + Send + Sync>);
methods.insert("/keyprovider.KeyProviderService/UnWrapKey".to_string(),
Box::new(UnWrapKeyMethod{service: service.clone()}) as Box<dyn ::ttrpc::MethodHandler + Send + Sync>);

ret.insert("keyprovider.KeyProviderService".to_string(), ::ttrpc::r#async::Service{ methods, streams });
ret
methods
}

0 comments on commit 4f9c7ca

Please sign in to comment.