Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: update kube and hyper #13533

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
981 changes: 439 additions & 542 deletions Cargo.lock

Large diffs are not rendered by default.

20 changes: 17 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,20 @@ members = [
lto = "thin"

[workspace.dependencies]
k8s-openapi = { version = "0.20", features = ["v1_22"] }
kube = { version = "0.87.1", default-features = false }
kubert = { version = "0.22", default-features = false }
k8s-openapi = { version = "0.24", features = ["v1_31"] }
kube = { version = "0.98", default-features = false }
kubert = { version = "0.23.0-alpha6", default-features = false }
prometheus-client = { version = "0.23", default-features = false }

[workspace.dependencies.k8s-gateway-api]
# TODO(ver): Remove this once we update to a proper generated version of the gateway api bindings.
git = "https://github.com/linkerd/k8s-gateway-api-rs"
features = ["experimental"]

[workspace.dependencies.linkerd2-proxy-api]
git = "https://github.com/linkerd/linkerd2-proxy-api"
branch = "ver/deps-http"
features = [
"inbound",
"outbound",
]
2 changes: 1 addition & 1 deletion policy-controller/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ anyhow = "1"
async-trait = "0.1"
chrono = { version = "0.4.39", default-features = false }
futures = { version = "0.3", default-features = false, features = ["std"] }
http = "0.2"
http = "1"
ipnet = "2"
regex = "1"
10 changes: 5 additions & 5 deletions policy-controller/grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,19 @@ publish = false
[dependencies]
async-stream = "0.3"
async-trait = "0.1"
http = "0.2"
http = "1"
drain = "0.1"
futures = { version = "0.3", default-features = false }
hyper = { version = "0.14", features = ["http2", "server", "tcp"] }
hyper = { version = "1", features = ["http2", "server"] }
linkerd-policy-controller-core = { path = "../core" }
maplit = "1"
prost-types = "0.12.6"
prost-types = "0.13"
tokio = { version = "1", features = ["macros"] }
tonic = { version = "0.10", default-features = false }
tonic = { version = "0.12", default-features = false }
tracing = "0.1"
serde = { version = "1", features = ["derive"] }
serde_json = "1"

[dependencies.linkerd2-proxy-api]
version = "0.15"
workspace = true
features = ["inbound", "outbound"]
2 changes: 1 addition & 1 deletion policy-controller/grpc/src/outbound/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub(crate) fn protocol(
}),
http1: Some(outbound::proxy_protocol::Http1 {
routes: routes.clone(),
failure_accrual: accrual.clone(),
failure_accrual: accrual,
}),
http2: Some(outbound::proxy_protocol::Http2 {
routes,
Expand Down
2 changes: 1 addition & 1 deletion policy-controller/k8s/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ publish = false

[dependencies]
k8s-openapi = { workspace = true }
k8s-gateway-api = { version = "0.16", features = ["experimental"] }
k8s-gateway-api = { workspace = true, features = ["experimental"] }
kube = { workspace = true, default-features = false, features = [
"client",
"derive",
Expand Down
6 changes: 3 additions & 3 deletions policy-controller/k8s/index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ ahash = "0.8"
anyhow = "1"
chrono = { version = "0.4.39", default-features = false }
futures = { version = "0.3", default-features = false }
http = "0.2"
http = "1"
kube = { workspace = true, default-features = false, features = [
"client",
"derive",
Expand All @@ -20,14 +20,14 @@ kubert = { workspace = true, default-features = false, features = ["index"] }
linkerd-policy-controller-core = { path = "../../core" }
linkerd-policy-controller-k8s-api = { path = "../api" }
parking_lot = "0.12"
prometheus-client = { version = "0.22.3", default-features = false }
prometheus-client = { workspace = true }
thiserror = "2"
tokio = { version = "1", features = ["macros", "rt", "sync"] }
tracing = "0.1"

[dev-dependencies]
chrono = { version = "0.4", default-features = false }
k8s-openapi = { version = "0.20", features = ["schemars"] }
k8s-openapi = { workspace = true, features = ["schemars"] }
maplit = "1"
tokio-stream = "0.1"
tokio-test = "0.4"
Expand Down
2 changes: 1 addition & 1 deletion policy-controller/k8s/status/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ kubert = { workspace = true, default-features = false, features = [
linkerd-policy-controller-core = { path = "../../core" }
linkerd-policy-controller-k8s-api = { path = "../api" }
parking_lot = "0.12"
prometheus-client = { version = "0.22.3", default-features = false }
prometheus-client = { workspace = true }
serde = "1"
serde_json = "1.0.135"
thiserror = "2"
Expand Down
10 changes: 7 additions & 3 deletions policy-controller/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,21 @@ rustls-tls = ["kube/rustls-tls"]
[dependencies]
anyhow = "1"
async-trait = "0.1"
bytes = "1"
drain = "0.1"
futures = { version = "0.3", default-features = false }
k8s-openapi = { workspace = true }
hyper = { version = "0.14", features = ["http1", "http2", "runtime", "server"] }
http-body-util = "0.1"
hyper = { version = "1", features = ["http1", "http2", "server"] }
ipnet = { version = "2", default-features = false }
openssl = { version = "0.10.68", optional = true }
parking_lot = "0.12"
prometheus-client = { version = "0.22.3", default-features = false }
prometheus-client = { workspace = true }
serde = "1"
serde_json = "1"
thiserror = "2"
tokio-stream = { version = "0.1", features = ["sync"] }
tower = "0.4"
tracing = "0.1"
regex = "1"

Expand Down Expand Up @@ -56,6 +59,7 @@ features = [
"lease",
"prometheus-client",
"runtime",
"runtime-diagnostics",
"server",
"rustls-tls",
]
Expand All @@ -65,6 +69,6 @@ version = "1"
features = ["macros", "parking_lot", "rt", "rt-multi-thread", "signal"]

[dependencies.tonic]
version = "0.10"
version = "0.12"
default-features = false
features = ["transport"]
23 changes: 15 additions & 8 deletions policy-controller/runtime/src/admission.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ use crate::k8s::policy::{
};
use anyhow::{anyhow, bail, ensure, Context, Result};
use futures::future;
use hyper::{body::Buf, http, Body, Request, Response};
use http_body_util::BodyExt;
use hyper::{http, Request, Response};
use k8s_openapi::api::core::v1::{Namespace, ServiceAccount};
use kube::{core::DynamicObject, Resource, ResourceExt};
use linkerd_policy_controller_core as core;
use linkerd_policy_controller_k8s_api::gateway::{self as k8s_gateway_api, GrpcRoute};
use linkerd_policy_controller_k8s_index::{self as index, outbound::index as outbound_index};
use serde::de::DeserializeOwned;
use std::{collections::BTreeMap, task};
use std::collections::BTreeMap;
use thiserror::Error;
use tracing::{debug, info, trace, warn};

Expand Down Expand Up @@ -49,31 +50,37 @@ trait Validate<T> {
) -> Result<()>;
}

type Body = http_body_util::Full<bytes::Bytes>;

// === impl AdmissionService ===

impl hyper::service::Service<Request<Body>> for Admission {
impl tower::Service<Request<hyper::body::Incoming>> for Admission {
type Response = Response<Body>;
type Error = Error;
type Future = future::BoxFuture<'static, Result<Response<Body>, Error>>;

fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> task::Poll<Result<(), Error>> {
task::Poll::Ready(Ok(()))
fn poll_ready(
&mut self,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::result::Result<(), Self::Error>> {
std::task::Poll::Ready(Ok(()))
}

fn call(&mut self, req: Request<Body>) -> Self::Future {
fn call(&mut self, req: Request<hyper::body::Incoming>) -> Self::Future {
trace!(?req);
if req.method() != http::Method::POST || req.uri().path() != "/" {
return Box::pin(future::ok(
Response::builder()
.status(http::StatusCode::NOT_FOUND)
.body(Body::empty())
.body(Body::default())
.expect("not found response must be valid"),
));
}

let admission = self.clone();
Box::pin(async move {
let bytes = hyper::body::aggregate(req.into_body()).await?;
use bytes::Buf;
let bytes = req.into_body().collect().await?.to_bytes();
let review: Review = match serde_json::from_reader(bytes.reader()) {
Ok(review) => review,
Err(error) => {
Expand Down
102 changes: 54 additions & 48 deletions policy-controller/runtime/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ use futures::prelude::*;
use k8s::{api::apps::v1::Deployment, Client, ObjectMeta, Resource};
use k8s_openapi::api::coordination::v1 as coordv1;
use kube::{api::PatchParams, runtime::watcher};
use kubert::LeaseManager;
use prometheus_client::registry::Registry;
use std::{net::SocketAddr, sync::Arc};
use tokio::{sync::mpsc, time::Duration};
use tokio::{
sync::{mpsc, watch},
time::Duration,
};
use tonic::transport::Server;
use tracing::{info, info_span, instrument, Instrument};

Expand Down Expand Up @@ -179,18 +181,20 @@ impl Args {

let hostname =
std::env::var("HOSTNAME").expect("Failed to fetch `HOSTNAME` environment variable");
let params = kubert::lease::ClaimParams {
lease_duration: LEASE_DURATION,
renew_grace_period: RENEW_GRACE_PERIOD,
};

let lease = init_lease(
runtime.client(),
&control_plane_namespace,
let claims = init_lease(
&runtime,
&policy_deployment_name,
kubert::LeaseParams {
name: LEASE_NAME.to_string(),
namespace: control_plane_namespace.clone(),
claimant: hostname.clone(),
lease_duration: LEASE_DURATION,
renew_grace_period: RENEW_GRACE_PERIOD,
field_manager: Some("policy-controller".into()),
},
)
.await?;
let (claims, _task) = lease.spawn(hostname.clone(), params).await?;

// Build the status index which will maintain information necessary for
// updating the status field of policy resources.
Expand Down Expand Up @@ -463,60 +467,62 @@ async fn grpc(
Ok(())
}

async fn init_lease(client: Client, ns: &str, deployment_name: &str) -> Result<LeaseManager> {
async fn init_lease<T>(
runtime: &kubert::Runtime<T>,
deployment_name: &str,
params: kubert::LeaseParams,
) -> Result<watch::Receiver<Arc<kubert::lease::Claim>>> {
// Fetch the policy-controller deployment so that we can use it as an owner
// reference of the Lease.
let api = k8s::Api::<Deployment>::namespaced(client.clone(), ns);
let api = k8s::Api::<Deployment>::namespaced(runtime.client(), &params.namespace);
let deployment = api.get(deployment_name).await?;

let api = k8s::Api::namespaced(client, ns);
let params = PatchParams {
field_manager: Some("policy-controller".to_string()),
..Default::default()
let lease = coordv1::Lease {
metadata: ObjectMeta {
name: Some(params.name.clone()),
namespace: Some(params.namespace.clone()),
// Specifying a resource version of "0" means that we will
// only create the Lease if it does not already exist.
resource_version: Some("0".to_string()),
owner_references: Some(vec![deployment.controller_owner_ref(&()).unwrap()]),
labels: Some(
[
(
"linkerd.io/control-plane-component".to_string(),
"destination".to_string(),
),
(
"linkerd.io/control-plane-ns".to_string(),
params.namespace.clone(),
),
]
.into_iter()
.collect(),
),
..Default::default()
},
spec: None,
};
match api
match k8s::Api::<coordv1::Lease>::namespaced(runtime.client(), &params.namespace)
.patch(
LEASE_NAME,
&params,
&kube::api::Patch::Apply(coordv1::Lease {
metadata: ObjectMeta {
name: Some(LEASE_NAME.to_string()),
namespace: Some(ns.to_string()),
// Specifying a resource version of "0" means that we will
// only create the Lease if it does not already exist.
resource_version: Some("0".to_string()),
owner_references: Some(vec![deployment.controller_owner_ref(&()).unwrap()]),
labels: Some(
[
(
"linkerd.io/control-plane-component".to_string(),
"destination".to_string(),
),
("linkerd.io/control-plane-ns".to_string(), ns.to_string()),
]
.into_iter()
.collect(),
),
..Default::default()
},
spec: None,
}),
&PatchParams {
field_manager: params.field_manager.clone().map(Into::into),
..Default::default()
},
&kube::api::Patch::Apply(lease),
)
.await
{
Ok(lease) => tracing::info!(?lease, "Created Lease resource"),
Err(k8s::Error::Api(_)) => tracing::debug!("Lease already exists, no need to create it"),
Err(error) => {
tracing::error!(%error, "Failed to create Lease resource");
return Err(error.into());
}
};
// Create the lease manager used for trying to claim the policy
// controller write lease.
// todo: Do we need to use LeaseManager::field_manager here?
kubert::lease::LeaseManager::init(api, LEASE_NAME)
.await
.map_err(Into::into)

let (claim, _task) = runtime.spawn_lease(params).await?;
Ok(claim)
}

async fn api_resource_exists<T>(client: &Client) -> bool
Expand Down
12 changes: 8 additions & 4 deletions policy-test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ publish = false

[dependencies]
anyhow = "1"
hyper = { version = "0.14", features = ["client", "http2"] }
bytes = "1"
http-body-util = "0.1"
hyper = { version = "1", features = ["client", "http2"] }
hyper-util = { version = "0.1" }
futures = { version = "0.3", default-features = false }
ipnet = "2"
k8s-gateway-api = "0.16"
k8s-gateway-api = { workspace = true }
k8s-openapi = { workspace = true }
linkerd-policy-controller-core = { path = "../policy-controller/core" }
linkerd-policy-controller-k8s-api = { path = "../policy-controller/k8s/api" }
Expand All @@ -20,8 +23,9 @@ rand = "0.8"
serde = "1"
serde_json = "1"
schemars = "0.8"
tonic = { version = "0.10", default-features = false }
tonic = { version = "0.12", default-features = false }
tokio = { version = "1", features = ["macros", "rt"] }
tower = "0.4"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

Expand All @@ -31,7 +35,7 @@ default-features = false
features = ["client", "openssl-tls", "runtime", "ws"]

[dependencies.linkerd2-proxy-api]
version = "0.15"
workspace = true
features = ["inbound", "outbound"]

[dev-dependencies]
Expand Down
Loading
Loading