diff --git a/Cargo.toml b/Cargo.toml index b607653..505fd9f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,8 @@ ring = ["dep:ring"] [dependencies] async-recursion = "1.1.1" +async-std = { version = "1.13.1", features = ["attributes"] } +async-stream = "0.3.6" async-trait = "0.1.88" base64 = "0.22.1" byteorder = "1.5.0" @@ -48,18 +50,16 @@ ring = { version = "0.17.14", optional = true, default-features = false, feature serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.140" sha2 = { version = "0.10.8", optional = true } -tokio = { version = "1.45.1", features = ["full"] } -tokio-stream = "0.1.17" -tokio-util = { version = "0.7.15", features = ["io"] } urlencoding = "2.1.3" xmltree = "0.11.0" futures = "0.3.31" http = "1.3.1" [dev-dependencies] +tokio = { version = "1.45.1", features = ["full"] } minio_common = { path = "./common" } async-std = { version = "1.13.1", features = ["attributes", "tokio1"] } -clap = { version = "4.5.39", features = ["derive"] } +clap = { version = "4.5.40", features = ["derive"] } quickcheck = "1.0.3" criterion = "0.6.0" diff --git a/benches/s3/bench_bucket_tagging.rs b/benches/s3/bench_bucket_tagging.rs index d18437c..8f0ad00 100644 --- a/benches/s3/bench_bucket_tagging.rs +++ b/benches/s3/bench_bucket_tagging.rs @@ -20,8 +20,8 @@ use minio::s3::builders::{DeleteBucketTagging, GetBucketTagging, PutBucketTaggin use minio::s3::types::S3Api; use minio_common::example::create_tags_example; -pub(crate) fn bench_put_bucket_tagging(criterion: &mut Criterion) { - if skip_express_mode("bench_put_bucket_tagging") { +pub(crate) async fn bench_put_bucket_tagging(criterion: &mut Criterion) { + if skip_express_mode("bench_put_bucket_tagging").await { return; } benchmark_s3_api( @@ -34,8 +34,8 @@ pub(crate) fn bench_put_bucket_tagging(criterion: &mut Criterion) { }, ) } -pub(crate) fn bench_get_bucket_tagging(criterion: &mut Criterion) { - if skip_express_mode("bench_get_bucket_tagging") { +pub(crate) async fn bench_get_bucket_tagging(criterion: &mut Criterion) { + if skip_express_mode("bench_get_bucket_tagging").await { return; } benchmark_s3_api( @@ -54,8 +54,8 @@ pub(crate) fn bench_get_bucket_tagging(criterion: &mut Criterion) { |ctx| GetBucketTagging::new(ctx.client.clone(), ctx.bucket.clone()), ) } -pub(crate) fn bench_delete_bucket_tagging(criterion: &mut Criterion) { - if skip_express_mode("bench_delete_bucket_tagging") { +pub(crate) async fn bench_delete_bucket_tagging(criterion: &mut Criterion) { + if skip_express_mode("bench_delete_bucket_tagging").await { return; } benchmark_s3_api( diff --git a/benches/s3/bench_bucket_versioning.rs b/benches/s3/bench_bucket_versioning.rs index 23d004a..44394df 100644 --- a/benches/s3/bench_bucket_versioning.rs +++ b/benches/s3/bench_bucket_versioning.rs @@ -18,8 +18,8 @@ use crate::common_benches::{Ctx2, benchmark_s3_api, skip_express_mode}; use criterion::Criterion; use minio::s3::builders::{GetBucketVersioning, PutBucketVersioning, VersioningStatus}; -pub(crate) fn bench_get_bucket_versioning(criterion: &mut Criterion) { - if skip_express_mode("bench_get_bucket_versioning") { +pub(crate) async fn bench_get_bucket_versioning(criterion: &mut Criterion) { + if skip_express_mode("bench_get_bucket_versioning").await { return; } benchmark_s3_api( @@ -29,8 +29,8 @@ pub(crate) fn bench_get_bucket_versioning(criterion: &mut Criterion) { |ctx| GetBucketVersioning::new(ctx.client.clone(), ctx.bucket.clone()), ) } -pub(crate) fn bench_put_bucket_versioning(criterion: &mut Criterion) { - if skip_express_mode("bench_put_bucket_versioning") { +pub(crate) async fn bench_put_bucket_versioning(criterion: &mut Criterion) { + if skip_express_mode("bench_put_bucket_versioning").await { return; } benchmark_s3_api( diff --git a/benches/s3/bench_object_append.rs b/benches/s3/bench_object_append.rs index 0e5ae43..2171168 100644 --- a/benches/s3/bench_object_append.rs +++ b/benches/s3/bench_object_append.rs @@ -24,8 +24,8 @@ use minio_common::test_context::TestContext; use tokio::task; #[allow(dead_code)] -pub(crate) fn bench_object_append(criterion: &mut Criterion) { - if !TestContext::new_from_env().client.is_minio_express() { +pub(crate) async fn bench_object_append(criterion: &mut Criterion) { + if !TestContext::new_from_env().client.is_minio_express().await { println!("Skipping benchmark because it is NOT running in MinIO Express mode"); return; } diff --git a/benches/s3/bench_object_legal_hold.rs b/benches/s3/bench_object_legal_hold.rs index 3291166..dd139b4 100644 --- a/benches/s3/bench_object_legal_hold.rs +++ b/benches/s3/bench_object_legal_hold.rs @@ -19,8 +19,8 @@ use criterion::Criterion; use minio::s3::builders::{GetObjectLegalHold, PutObjectLegalHold}; use minio::s3::types::S3Api; -pub(crate) fn bench_put_object_legal_hold(criterion: &mut Criterion) { - if skip_express_mode("bench_put_object_legal_hold") { +pub(crate) async fn bench_put_object_legal_hold(criterion: &mut Criterion) { + if skip_express_mode("bench_put_object_legal_hold").await { return; } benchmark_s3_api( @@ -33,8 +33,8 @@ pub(crate) fn bench_put_object_legal_hold(criterion: &mut Criterion) { }, ) } -pub(crate) fn bench_get_object_legal_hold(criterion: &mut Criterion) { - if skip_express_mode("bench_get_object_legal_hold") { +pub(crate) async fn bench_get_object_legal_hold(criterion: &mut Criterion) { + if skip_express_mode("bench_get_object_legal_hold").await { return; } benchmark_s3_api( diff --git a/benches/s3/bench_object_lock_config.rs b/benches/s3/bench_object_lock_config.rs index a60b78b..83e6949 100644 --- a/benches/s3/bench_object_lock_config.rs +++ b/benches/s3/bench_object_lock_config.rs @@ -18,8 +18,8 @@ use criterion::Criterion; use minio::s3::builders::{DeleteObjectLockConfig, GetObjectLockConfig, PutObjectLockConfig}; use minio_common::example::create_object_lock_config_example; -pub(crate) fn bench_put_object_lock_config(criterion: &mut Criterion) { - if skip_express_mode("bench_put_object_lock_config") { +pub(crate) async fn bench_put_object_lock_config(criterion: &mut Criterion) { + if skip_express_mode("bench_put_object_lock_config").await { return; } benchmark_s3_api( @@ -32,8 +32,8 @@ pub(crate) fn bench_put_object_lock_config(criterion: &mut Criterion) { }, ) } -pub(crate) fn bench_get_object_lock_config(criterion: &mut Criterion) { - if skip_express_mode("bench_get_object_lock_config") { +pub(crate) async fn bench_get_object_lock_config(criterion: &mut Criterion) { + if skip_express_mode("bench_get_object_lock_config").await { return; } benchmark_s3_api( @@ -43,8 +43,8 @@ pub(crate) fn bench_get_object_lock_config(criterion: &mut Criterion) { |ctx| GetObjectLockConfig::new(ctx.client.clone(), ctx.bucket.clone()), ) } -pub(crate) fn bench_delete_object_lock_config(criterion: &mut Criterion) { - if skip_express_mode("bench_delete_object_lock_config") { +pub(crate) async fn bench_delete_object_lock_config(criterion: &mut Criterion) { + if skip_express_mode("bench_delete_object_lock_config").await { return; } benchmark_s3_api( diff --git a/benches/s3/bench_object_retention.rs b/benches/s3/bench_object_retention.rs index 42a73ee..40c3054 100644 --- a/benches/s3/bench_object_retention.rs +++ b/benches/s3/bench_object_retention.rs @@ -21,8 +21,8 @@ use minio::s3::response::PutObjectRetentionResponse; use minio::s3::types::{RetentionMode, S3Api}; use minio::s3::utils::utc_now; -pub(crate) fn bench_put_object_retention(criterion: &mut Criterion) { - if skip_express_mode("bench_put_object_retention") { +pub(crate) async fn bench_put_object_retention(criterion: &mut Criterion) { + if skip_express_mode("bench_put_object_retention").await { return; } benchmark_s3_api( @@ -36,8 +36,8 @@ pub(crate) fn bench_put_object_retention(criterion: &mut Criterion) { }, ) } -pub(crate) fn bench_get_object_retention(criterion: &mut Criterion) { - if skip_express_mode("bench_get_object_retention") { +pub(crate) async fn bench_get_object_retention(criterion: &mut Criterion) { + if skip_express_mode("bench_get_object_retention").await { return; } benchmark_s3_api( diff --git a/benches/s3/bench_object_tagging.rs b/benches/s3/bench_object_tagging.rs index cb260a5..e733ac1 100644 --- a/benches/s3/bench_object_tagging.rs +++ b/benches/s3/bench_object_tagging.rs @@ -21,8 +21,8 @@ use minio::s3::response::PutObjectTaggingResponse; use minio::s3::types::S3Api; use minio_common::example::create_tags_example; -pub(crate) fn bench_put_object_tagging(criterion: &mut Criterion) { - if skip_express_mode("bench_put_object_tagging") { +pub(crate) async fn bench_put_object_tagging(criterion: &mut Criterion) { + if skip_express_mode("bench_put_object_tagging").await { return; } benchmark_s3_api( @@ -35,8 +35,8 @@ pub(crate) fn bench_put_object_tagging(criterion: &mut Criterion) { }, ) } -pub(crate) fn bench_get_object_tagging(criterion: &mut Criterion) { - if skip_express_mode("bench_get_object_tagging") { +pub(crate) async fn bench_get_object_tagging(criterion: &mut Criterion) { + if skip_express_mode("bench_get_object_tagging").await { return; } benchmark_s3_api( diff --git a/benches/s3/common_benches.rs b/benches/s3/common_benches.rs index 514f9cf..435a784 100644 --- a/benches/s3/common_benches.rs +++ b/benches/s3/common_benches.rs @@ -167,8 +167,8 @@ pub(crate) fn benchmark_s3_api( group.finish(); } -pub(crate) fn skip_express_mode(bench_name: &str) -> bool { - let skip = TestContext::new_from_env().client.is_minio_express(); +pub(crate) async fn skip_express_mode(bench_name: &str) -> bool { + let skip = TestContext::new_from_env().client.is_minio_express().await; if skip { println!("Skipping benchmark '{}' (MinIO Express mode)", bench_name); } diff --git a/common/Cargo.toml b/common/Cargo.toml index b2de269..0cec188 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -5,15 +5,15 @@ edition = "2024" [dependencies] minio = {path = ".." } -tokio = { version = "1.44.1", features = ["full"] } -tokio-stream = "0.1.17" +tokio = { version = "1.45.1", features = ["full"] } async-std = "1.13.1" rand = { version = "0.8.5", features = ["small_rng"] } bytes = "1.10.1" log = "0.4.27" -chrono = "0.4.40" -reqwest = "0.12.15" +chrono = "0.4.41" +reqwest = "0.12.20" http = "1.3.1" +futures = "0.3.31" [lib] name = "minio_common" diff --git a/common/src/rand_src.rs b/common/src/rand_src.rs index 591a5dd..6e9135f 100644 --- a/common/src/rand_src.rs +++ b/common/src/rand_src.rs @@ -13,13 +13,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +use async_std::stream::Stream; use async_std::task; use bytes::Bytes; -use rand::SeedableRng; +use futures::io::AsyncRead; use rand::prelude::SmallRng; +use rand::{RngCore, SeedableRng}; use std::io; -use tokio::io::AsyncRead; -use tokio_stream::Stream; +use std::pin::Pin; +use std::task::{Context, Poll}; pub struct RandSrc { size: u64, @@ -64,26 +66,21 @@ impl Stream for RandSrc { impl AsyncRead for RandSrc { fn poll_read( - self: std::pin::Pin<&mut Self>, - _cx: &mut task::Context<'_>, - read_buf: &mut tokio::io::ReadBuf<'_>, - ) -> task::Poll> { - let buf = read_buf.initialize_unfilled(); - let bytes_read = match self.size > (buf.len() as u64) { - true => buf.len(), - false => self.size as usize, - }; + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + let this = self.as_mut().get_mut(); - let this = self.get_mut(); - - if bytes_read > 0 { - let random: &mut dyn rand::RngCore = &mut this.rng; - random.fill_bytes(&mut buf[0..bytes_read]); + if this.size == 0 { + return Poll::Ready(Ok(0)); // EOF } - this.size -= bytes_read as u64; + let to_read = std::cmp::min(this.size as usize, buf.len()); - read_buf.advance(bytes_read); - task::Poll::Ready(Ok(())) + this.rng.fill_bytes(&mut buf[..to_read]); + this.size -= to_read as u64; + + Poll::Ready(Ok(to_read)) } } diff --git a/examples/append_object.rs b/examples/append_object.rs index 0bccaf0..2617799 100644 --- a/examples/append_object.rs +++ b/examples/append_object.rs @@ -28,7 +28,7 @@ async fn main() -> Result<(), Box> { env_logger::init(); // Note: set environment variable RUST_LOG="INFO" to log info and higher let client: Client = create_client_on_localhost()?; - if !client.is_minio_express() { + if !client.is_minio_express().await { println!("Need (MinIO) Express mode to run this example"); return Ok(()); } diff --git a/src/s3/builders/delete_objects.rs b/src/s3/builders/delete_objects.rs index 434595b..3410e1e 100644 --- a/src/s3/builders/delete_objects.rs +++ b/src/s3/builders/delete_objects.rs @@ -15,14 +15,6 @@ //! Builders for RemoveObject APIs. -use async_trait::async_trait; -use bytes::Bytes; -use futures_util::{Stream, StreamExt, stream as futures_stream}; -use http::Method; -use std::pin::Pin; - -use tokio_stream::iter as stream_iter; - use crate::s3::client::MAX_MULTIPART_COUNT; use crate::s3::multimap::{Multimap, MultimapExt}; use crate::s3::response::DeleteError; @@ -35,6 +27,12 @@ use crate::s3::{ types::{S3Api, S3Request, ToS3Request, ToStream}, utils::{check_bucket_name, md5sum_hash}, }; +use async_trait::async_trait; +use bytes::Bytes; +use futures_util::stream::iter; +use futures_util::{Stream, StreamExt, stream as futures_stream}; +use http::Method; +use std::pin::Pin; // region: object-to-delete @@ -260,7 +258,7 @@ impl ToS3Request for DeleteObjects { data.push_str(""); } data.push_str(""); - let data: Bytes = data.into(); + let bytes: Bytes = data.into(); let mut headers: Multimap = self.extra_headers.unwrap_or_default(); { @@ -268,7 +266,7 @@ impl ToS3Request for DeleteObjects { headers.add("x-amz-bypass-governance-retention", "true"); } headers.add("Content-Type", "application/xml"); - headers.add("Content-MD5", md5sum_hash(data.as_ref())); + headers.add("Content-MD5", md5sum_hash(bytes.as_ref())); } Ok(S3Request::new(self.client, Method::POST) @@ -276,7 +274,7 @@ impl ToS3Request for DeleteObjects { .bucket(Some(self.bucket)) .query_params(insert(self.extra_query_params, "delete")) .headers(headers) - .body(Some(data.into()))) + .body(Some(bytes.into()))) } } @@ -296,7 +294,7 @@ impl ObjectsStream { impl From for ObjectsStream { fn from(delete_object: ObjectToDelete) -> Self { - Self::from_stream(stream_iter(std::iter::once(delete_object))) + Self::from_stream(iter(std::iter::once(delete_object))) } } @@ -305,7 +303,7 @@ where I: Iterator + Send + Sync + 'static, { fn from(keys: I) -> Self { - Self::from_stream(stream_iter(keys)) + Self::from_stream(iter(keys)) } } diff --git a/src/s3/builders/get_presigned_object_url.rs b/src/s3/builders/get_presigned_object_url.rs index a600934..527fa60 100644 --- a/src/s3/builders/get_presigned_object_url.rs +++ b/src/s3/builders/get_presigned_object_url.rs @@ -72,7 +72,10 @@ impl GetPresignedObjectUrl { check_bucket_name(&self.bucket, true)?; check_object_name(&self.object)?; - let region: String = self.client.get_region_cached(&self.bucket, &self.region)?; + let region: String = self + .client + .get_region_cached(&self.bucket, &self.region) + .await?; let mut query_params: Multimap = self.extra_query_params.unwrap_or_default(); query_params.add_version(self.version_id.clone()); diff --git a/src/s3/builders/get_presigned_policy_form_data.rs b/src/s3/builders/get_presigned_policy_form_data.rs index b2265a5..0ee6a1b 100644 --- a/src/s3/builders/get_presigned_policy_form_data.rs +++ b/src/s3/builders/get_presigned_policy_form_data.rs @@ -35,10 +35,10 @@ impl GetPresignedPolicyFormData { } pub async fn send(self) -> Result, Error> { - // NOTE: this send function is async to be comparable with other functions... let region: String = self .client - .get_region_cached(&self.policy.bucket, &self.policy.region)?; + .get_region_cached(&self.policy.bucket, &self.policy.region) + .await?; let creds: Credentials = self.client.shared.provider.as_ref().unwrap().fetch(); self.policy.form_data( diff --git a/src/s3/builders/put_bucket_lifecycle.rs b/src/s3/builders/put_bucket_lifecycle.rs index 8caa934..76793f4 100644 --- a/src/s3/builders/put_bucket_lifecycle.rs +++ b/src/s3/builders/put_bucket_lifecycle.rs @@ -18,7 +18,6 @@ use crate::s3::error::Error; use crate::s3::lifecycle_config::LifecycleConfig; use crate::s3::multimap::{Multimap, MultimapExt}; use crate::s3::response::PutBucketLifecycleResponse; -use crate::s3::segmented_bytes::SegmentedBytes; use crate::s3::types::{S3Api, S3Request, ToS3Request}; use crate::s3::utils::{check_bucket_name, insert, md5sum_hash}; use bytes::Bytes; @@ -81,14 +80,13 @@ impl ToS3Request for PutBucketLifecycle { let mut headers: Multimap = self.extra_headers.unwrap_or_default(); let bytes: Bytes = self.config.to_xml().into(); - headers.add("Content-MD5", md5sum_hash(&bytes)); - let body: Option = Some(SegmentedBytes::from(bytes)); + headers.add("Content-MD5", md5sum_hash(bytes.as_ref())); Ok(S3Request::new(self.client, Method::PUT) .region(self.region) .bucket(Some(self.bucket)) .query_params(insert(self.extra_query_params, "lifecycle")) .headers(headers) - .body(body)) + .body(Some(bytes.into()))) } } diff --git a/src/s3/builders/put_object.rs b/src/s3/builders/put_object.rs index ede2ee4..54c709c 100644 --- a/src/s3/builders/put_object.rs +++ b/src/s3/builders/put_object.rs @@ -283,9 +283,9 @@ impl ToS3Request for CompleteMultipartUpload { } } - // Set capacity of the byte-buffer based on the part count - attempting + // Set the capacity of the byte-buffer based on the part count - attempting // to avoid extra allocations when building the XML payload. - let data: Bytes = { + let bytes: Bytes = { let mut data = BytesMut::with_capacity(100 * self.parts.len() + 100); data.extend_from_slice(b""); for part in self.parts.iter() { @@ -302,7 +302,7 @@ impl ToS3Request for CompleteMultipartUpload { let mut headers: Multimap = self.extra_headers.unwrap_or_default(); { headers.add("Content-Type", "application/xml"); - headers.add("Content-MD5", md5sum_hash(data.as_ref())); + headers.add("Content-MD5", md5sum_hash(bytes.as_ref())); } let mut query_params: Multimap = self.extra_query_params.unwrap_or_default(); query_params.add("uploadId", self.upload_id); @@ -313,7 +313,7 @@ impl ToS3Request for CompleteMultipartUpload { .object(Some(self.object)) .query_params(query_params) .headers(headers) - .body(Some(data.into()))) + .body(Some(bytes.into()))) } } // endregion: complete-multipart-upload diff --git a/src/s3/builders/put_object_legal_hold.rs b/src/s3/builders/put_object_legal_hold.rs index 52f6577..ccdd4b0 100644 --- a/src/s3/builders/put_object_legal_hold.rs +++ b/src/s3/builders/put_object_legal_hold.rs @@ -17,7 +17,6 @@ use crate::s3::Client; use crate::s3::error::Error; use crate::s3::multimap::{Multimap, MultimapExt}; use crate::s3::response::PutObjectLegalHoldResponse; -use crate::s3::segmented_bytes::SegmentedBytes; use crate::s3::types::{S3Api, S3Request, ToS3Request}; use crate::s3::utils::{check_bucket_name, check_object_name, insert, md5sum_hash}; use bytes::Bytes; @@ -88,10 +87,10 @@ impl ToS3Request for PutObjectLegalHold { Some(true) => "ON", _ => "OFF", }; + let bytes: Bytes = Bytes::from(payload); // TODO consider const payload with precalculated md5 - headers.add("Content-MD5", md5sum_hash(payload.as_ref())); - let body: Option = Some(SegmentedBytes::from(Bytes::from(payload))); + headers.add("Content-MD5", md5sum_hash(bytes.as_ref())); Ok(S3Request::new(self.client, Method::PUT) .region(self.region) @@ -99,6 +98,6 @@ impl ToS3Request for PutObjectLegalHold { .query_params(query_params) .headers(headers) .object(Some(self.object)) - .body(body)) + .body(Some(bytes.into()))) } } diff --git a/src/s3/builders/put_object_retention.rs b/src/s3/builders/put_object_retention.rs index f24e733..9db2764 100644 --- a/src/s3/builders/put_object_retention.rs +++ b/src/s3/builders/put_object_retention.rs @@ -17,7 +17,6 @@ use crate::s3::Client; use crate::s3::error::Error; use crate::s3::multimap::{Multimap, MultimapExt}; use crate::s3::response::PutObjectRetentionResponse; -use crate::s3::segmented_bytes::SegmentedBytes; use crate::s3::types::{RetentionMode, S3Api, S3Request, ToS3Request}; use crate::s3::utils::{ UtcTime, check_bucket_name, check_object_name, insert, md5sum_hash, to_iso8601utc, @@ -108,7 +107,7 @@ impl ToS3Request for PutObjectRetention { } } - let data: String = { + let bytes: Bytes = { let mut data: String = "".into(); if let Some(v) = &self.retention_mode { data.push_str(""); @@ -121,14 +120,14 @@ impl ToS3Request for PutObjectRetention { data.push_str(""); } data.push_str(""); - data + Bytes::from(data) }; let mut headers: Multimap = self.extra_headers.unwrap_or_default(); if self.bypass_governance_mode { headers.add("x-amz-bypass-governance-retention", "true"); } - headers.add("Content-MD5", md5sum_hash(data.as_ref())); + headers.add("Content-MD5", md5sum_hash(bytes.as_ref())); let mut query_params: Multimap = insert(self.extra_query_params, "retention"); query_params.add_version(self.version_id); @@ -139,6 +138,6 @@ impl ToS3Request for PutObjectRetention { .query_params(query_params) .headers(headers) .object(Some(self.object)) - .body(Some(SegmentedBytes::from(Bytes::from(data))))) + .body(Some(bytes.into()))) } } diff --git a/src/s3/builders/select_object_content.rs b/src/s3/builders/select_object_content.rs index d94f539..0c5463d 100644 --- a/src/s3/builders/select_object_content.rs +++ b/src/s3/builders/select_object_content.rs @@ -17,7 +17,6 @@ use crate::s3::Client; use crate::s3::error::Error; use crate::s3::multimap::{Multimap, MultimapExt}; use crate::s3::response::SelectObjectContentResponse; -use crate::s3::segmented_bytes::SegmentedBytes; use crate::s3::sse::SseCustomerKey; use crate::s3::types::{S3Api, S3Request, SelectRequest, ToS3Request}; use crate::s3::utils::{check_bucket_name, check_object_name, insert, md5sum_hash}; @@ -100,9 +99,7 @@ impl ToS3Request for SelectObjectContent { return Err(Error::SseTlsRequired(None)); } } - let region: String = self.client.get_region_cached(&self.bucket, &self.region)?; - let data = self.request.to_xml(); - let bytes: Bytes = data.into(); + let bytes: Bytes = self.request.to_xml().into(); let mut headers: Multimap = self.extra_headers.unwrap_or_default(); headers.add("Content-MD5", md5sum_hash(bytes.as_ref())); @@ -110,14 +107,12 @@ impl ToS3Request for SelectObjectContent { let mut query_params: Multimap = insert(self.extra_query_params, "select"); query_params.add("select-type", "2"); - let body: Option = Some(SegmentedBytes::from(bytes)); - Ok(S3Request::new(self.client, Method::POST) - .region(Some(region)) + .region(self.region) .bucket(Some(self.bucket)) .query_params(query_params) - .object(Some(self.object)) .headers(headers) - .body(body)) + .object(Some(self.object)) + .body(Some(bytes.into()))) } } diff --git a/src/s3/client.rs b/src/s3/client.rs index 69f1a1f..50cbe84 100644 --- a/src/s3/client.rs +++ b/src/s3/client.rs @@ -36,9 +36,7 @@ use dashmap::DashMap; use http::HeaderMap; use hyper::http::Method; use rand::Rng; -use rand::distributions::Alphanumeric; use reqwest::Body; -use tokio::task; mod append_object; mod bucket_exists; @@ -257,7 +255,7 @@ impl Client { .build() } - /// Returns whether is client uses an AWS host. + /// Returns whether this client uses an AWS host. pub fn is_aws_host(&self) -> bool { self.shared.base_url.is_aws_host() } @@ -268,44 +266,38 @@ impl Client { } /// Returns whether this client is configured to use the express endpoint and is minio enterprise. - pub fn is_minio_express(&self) -> bool { - if self.shared.express.get().is_some() { - *self.shared.express.get().unwrap() + pub async fn is_minio_express(&self) -> bool { + if let Some(val) = self.shared.express.get() { + *val } else { - task::block_in_place(|| match tokio::runtime::Runtime::new() { - Ok(rt) => { - // create a random bucket name, and check if it exists, - // we are not interested in the result, just the headers - // which will contain the server type + // Create a random bucket name + let bucket_name: String = rand::thread_rng() + .sample_iter(&rand::distributions::Alphanumeric) + .take(20) + .map(char::from) + .collect::() + .to_lowercase(); - let bucket_name: String = rand::thread_rng() - .sample_iter(&Alphanumeric) - .take(20) - .map(char::from) - .collect::() - .to_lowercase(); - - let express: bool = rt.block_on(async { - match BucketExists::new(self.clone(), bucket_name).send().await { - Ok(v) => { - if let Some(server) = v.headers.get("server") { - if let Ok(s) = server.to_str() { - return s - .eq_ignore_ascii_case("MinIO Enterprise/S3Express"); - } - } - } - Err(e) => { - println!("is_express_internal: error: {e}\nassume false"); - } + let express = match BucketExists::new(self.clone(), bucket_name).send().await { + Ok(v) => { + if let Some(server) = v.headers.get("server") { + if let Ok(s) = server.to_str() { + s.eq_ignore_ascii_case("MinIO Enterprise/S3Express") + } else { + false } + } else { false - }); - self.shared.express.set(express).unwrap_or_default(); - express + } } - Err(_) => false, - }) + Err(e) => { + log::warn!("is_express_internal: error: {e}, assume false"); + false + } + }; + + self.shared.express.set(express).unwrap_or_default(); + express } } diff --git a/src/s3/client/delete_bucket.rs b/src/s3/client/delete_bucket.rs index b53cfc0..8c1e028 100644 --- a/src/s3/client/delete_bucket.rs +++ b/src/s3/client/delete_bucket.rs @@ -55,7 +55,7 @@ impl Client { bucket: S, ) -> Result { let bucket: String = bucket.into(); - if self.is_minio_express() { + if self.is_minio_express().await { let mut stream = self.list_objects(&bucket).to_stream().await; while let Some(items) = stream.next().await { diff --git a/src/s3/client/get_region.rs b/src/s3/client/get_region.rs index 21ac285..634379e 100644 --- a/src/s3/client/get_region.rs +++ b/src/s3/client/get_region.rs @@ -19,8 +19,6 @@ use crate::s3::error::Error; use crate::s3::response::GetRegionResponse; use crate::s3::types::S3Api; -use tokio::task; - impl Client { /// Creates a [`GetRegion`] request builder. /// @@ -50,7 +48,7 @@ impl Client { /// Retrieves the region for the specified bucket name from the cache. /// If the region is not found in the cache, it is fetched via a call to S3 or MinIO /// and then stored in the cache for future lookups. - pub async fn get_region_cached_async>( + pub async fn get_region_cached>( &self, bucket: S, region: &Option, // the region as provided by the S3Request @@ -98,17 +96,4 @@ impl Client { .insert(bucket, resolved_region.clone()); Ok(resolved_region) } - - /// Retrieves the region for the specified bucket name from the cache. - /// If the region is not found in the cache, it is fetched via a call to S3 or MinIO - /// and then stored in the cache for future lookups. - pub fn get_region_cached( - &self, - bucket: &str, - region: &Option, - ) -> Result { - task::block_in_place(|| { - tokio::runtime::Runtime::new()?.block_on(self.get_region_cached_async(bucket, region)) - }) - } } diff --git a/src/s3/error.rs b/src/s3/error.rs index c5a238b..64a9622 100644 --- a/src/s3/error.rs +++ b/src/s3/error.rs @@ -88,7 +88,7 @@ impl ErrorCode { /// Error response for S3 operations pub struct ErrorResponse { /// Headers as returned by the server. - pub headers: HeaderMap, + pub(crate) headers: HeaderMap, pub code: ErrorCode, pub message: String, pub resource: String, @@ -129,11 +129,16 @@ pub enum Error { StrError(reqwest::header::ToStrError), IntError(std::num::ParseIntError), BoolError(std::str::ParseBoolError), - Utf8Error(alloc::string::FromUtf8Error), + + Utf8Error(Box), + /// Occurs when converting Vec to String (e.g. String::from_utf8) + //FromUtf8Error(alloc::string::FromUtf8Error), + /// Occurs when converting &[u8] to &str (e.g. std::str::from_utf8) + //Utf8Error(std::str::Utf8Error), JsonError(serde_json::Error), XmlError(String), - InvalidBucketName(String), InvalidBaseUrl(String), + InvalidBucketName(String), UrlBuildError(String), RegionMismatch(String, String), S3Error(ErrorResponse), @@ -198,6 +203,7 @@ impl fmt::Display for Error { Error::IntError(e) => write!(f, "{e}"), Error::BoolError(e) => write!(f, "{e}"), Error::Utf8Error(e) => write!(f, "{e}"), + //Error::FromUtf8Error(e) => write!(f, "{e}"), Error::JsonError(e) => write!(f, "{e}"), Error::XmlError(m) => write!(f, "{m}"), Error::InvalidBucketName(m) => write!(f, "{m}"), @@ -397,7 +403,13 @@ impl From for Error { impl From for Error { fn from(err: alloc::string::FromUtf8Error) -> Self { - Error::Utf8Error(err) + Error::Utf8Error(err.into()) + } +} + +impl From for Error { + fn from(err: std::str::Utf8Error) -> Self { + Error::Utf8Error(err.into()) } } diff --git a/src/s3/object_content.rs b/src/s3/object_content.rs index 5e4ac4a..6bd5a37 100644 --- a/src/s3/object_content.rs +++ b/src/s3/object_content.rs @@ -13,15 +13,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::path::PathBuf; -use std::{ffi::OsString, path::Path, pin::Pin}; - +use async_std::io::{ReadExt, WriteExt}; use bytes::Bytes; -use futures_util::Stream; +use futures::stream::{self, Stream, StreamExt}; use rand::prelude::random; -use tokio::fs; -use tokio::io::AsyncWriteExt; -use tokio_stream::StreamExt; +use std::path::PathBuf; +use std::{ffi::OsString, fs, path::Path, pin::Pin}; use crate::s3::segmented_bytes::SegmentedBytes; #[cfg(test)] @@ -151,15 +148,30 @@ impl ObjectContent { ) -> IoResult<(Pin> + Send>>, Size)> { match self.0 { ObjectContentInner::Stream(r, size) => Ok((r, size)), + ObjectContentInner::FilePath(path) => { - let file = fs::File::open(&path).await?; - let size = file.metadata().await?.len(); - let r = tokio_util::io::ReaderStream::new(file); - Ok((Box::pin(r), Some(size).into())) + let mut file = async_std::fs::File::open(&path).await?; + let metadata = file.metadata().await?; + let size = metadata.len(); + + // Define a stream that reads the file in chunks + let stream = async_stream::try_stream! { + let mut buf = vec![0u8; 8192]; + loop { + let n = file.read(&mut buf).await?; + if n == 0 { + break; + } + yield Bytes::copy_from_slice(&buf[..n]); + } + }; + + Ok((Box::pin(stream), Some(size).into())) } + ObjectContentInner::Bytes(sb) => { let k = sb.len(); - let r = Box::pin(tokio_stream::iter(sb.into_iter().map(Ok))); + let r = Box::pin(stream::iter(sb.into_iter().map(Ok))); Ok((r, Some(k as u64).into())) } } @@ -203,7 +215,7 @@ impl ObjectContent { )) })?; if !parent_dir.is_dir() { - fs::create_dir_all(parent_dir).await?; + async_std::fs::create_dir_all(parent_dir).await?; } let file_name = file_path.file_name().ok_or(std::io::Error::other( "could not get filename component of path", @@ -215,7 +227,7 @@ impl ObjectContent { .join(Path::new(tmp_file_name.as_os_str())); let mut total_bytes_written = 0; - let mut fp = fs::OpenOptions::new() + let mut fp = async_std::fs::OpenOptions::new() .write(true) .create(true) // Ensures that the file will be created if it does not already exist .truncate(true) // Clears the contents (truncates the file size to 0) before writing @@ -231,7 +243,7 @@ impl ObjectContent { fp.write_all(&bytes).await?; } fp.flush().await?; - fs::rename(&tmp_file_path, file_path).await?; + fs::rename(&tmp_file_path, file_path)?; Ok(total_bytes_written) } } @@ -263,7 +275,7 @@ impl ContentStream { pub fn empty() -> Self { Self { - r: Box::pin(tokio_stream::iter(vec![])), + r: Box::pin(stream::iter(vec![])), extra: None, size: Some(0).into(), } diff --git a/src/s3/response/listen_bucket_notification.rs b/src/s3/response/listen_bucket_notification.rs index f0e96ba..e135565 100644 --- a/src/s3/response/listen_bucket_notification.rs +++ b/src/s3/response/listen_bucket_notification.rs @@ -13,17 +13,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use futures_util::{Stream, TryStreamExt, stream}; +use crate::s3::error::Error; +use crate::s3::types::{FromS3Response, NotificationRecords, S3Request}; +use crate::s3::utils::take_bucket; +use futures_util::{Stream, StreamExt, TryStreamExt}; use http::HeaderMap; use std::mem; -use tokio::io::AsyncBufReadExt; -use tokio_util::io::StreamReader; - -use crate::s3::utils::take_bucket; -use crate::s3::{ - error::Error, - types::{FromS3Response, NotificationRecords, S3Request}, -}; /// Response of /// [listen _bucket_notification()](crate::s3::client::Client::listen_bucket_notification) @@ -54,31 +49,48 @@ impl FromS3Response let mut resp = resp?; let headers: HeaderMap = mem::take(resp.headers_mut()); - let stream_reader = StreamReader::new(resp.bytes_stream().map_err(std::io::Error::other)); + // A simple stateful decoder that buffers bytes and yields complete lines + let byte_stream = resp.bytes_stream(); // This is a futures::Stream> - let record_stream = Box::pin(stream::unfold( - stream_reader, - move |mut reader| async move { - loop { - let mut line = String::new(); - return match reader.read_line(&mut line).await { - Ok(n) => { - if n == 0 { - return None; - } - let s = line.trim(); - if s.is_empty() { - continue; - } - let records_res: Result = - serde_json::from_str(s).map_err(|e| e.into()); - Some((records_res, reader)) - } - Err(e) => Some((Err(e.into()), reader)), - }; + let line_stream = Box::pin(async_stream::try_stream! { + let mut buf = Vec::new(); + let mut cursor = 0; + + let mut stream = byte_stream.map_err(Error::from).boxed(); + + while let Some(chunk) = stream.next().await { + let chunk = chunk?; + buf.extend_from_slice(&chunk); + + while let Some(pos) = buf[cursor..].iter().position(|&b| b == b'\n') { + let end = cursor + pos; + let line_bytes = &buf[..end]; + let line = std::str::from_utf8(line_bytes)?.trim(); + + if !line.is_empty() { + let parsed: NotificationRecords = serde_json::from_str(line)?; + yield parsed; + } + + cursor = end + 1; } - }, - )); + + // Shift buffer left if needed + if cursor > 0 { + buf.drain(..cursor); + cursor = 0; + } + } + + // Drain the remaining buffer if not empty + if !buf.is_empty() { + let line = std::str::from_utf8(&buf)?.trim(); + if !line.is_empty() { + let parsed: NotificationRecords = serde_json::from_str(line)?; + yield parsed; + } + } + }); Ok(( ListenBucketNotificationResponse { @@ -86,7 +98,7 @@ impl FromS3Response region: req.inner_region, bucket: take_bucket(req.bucket)?, }, - Box::new(record_stream), + Box::new(line_stream), )) } } diff --git a/src/s3/types.rs b/src/s3/types.rs index bf45fe0..a15ed2b 100644 --- a/src/s3/types.rs +++ b/src/s3/types.rs @@ -87,16 +87,16 @@ impl S3Request { self } - fn compute_inner_region(&self) -> Result { + async fn compute_inner_region(&self) -> Result { Ok(match &self.bucket { - Some(b) => self.client.get_region_cached(b, &self.region)?, + Some(b) => self.client.get_region_cached(b, &self.region).await?, None => DEFAULT_REGION.to_string(), }) } /// Execute the request, returning the response. Only used in [`S3Api::send()`] pub async fn execute(&mut self) -> Result { - self.inner_region = self.compute_inner_region()?; + self.inner_region = self.compute_inner_region().await?; self.client .execute( self.method.clone(), @@ -222,7 +222,7 @@ pub trait S3Api: ToS3Request { /// or an error if the request failed at any stage. /// async fn send(self) -> Result { - let mut req = self.to_s3request()?; + let mut req: S3Request = self.to_s3request()?; let resp: Result = req.execute().await; Self::S3Response::from_s3response(req, resp).await } diff --git a/tests/test_append_object.rs b/tests/test_append_object.rs index 9dcebf3..5dc82b5 100644 --- a/tests/test_append_object.rs +++ b/tests/test_append_object.rs @@ -69,10 +69,10 @@ async fn create_object_helper( } /// Append to the end of an existing object (happy flow) -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn append_object_0() { let ctx = TestContext::new_from_env(); - if !ctx.client.is_minio_express() { + if !ctx.client.is_minio_express().await { println!("Skipping test because it is NOT running in MinIO Express mode"); return; } @@ -124,10 +124,10 @@ async fn append_object_0() { } /// Append to the beginning of an existing object (happy flow) -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn append_object_1() { let ctx = TestContext::new_from_env(); - if !ctx.client.is_minio_express() { + if !ctx.client.is_minio_express().await { println!("Skipping test because it is NOT running in MinIO Express mode"); return; } @@ -178,10 +178,10 @@ async fn append_object_1() { } /// Append to the middle of an existing object (error InvalidWriteOffset) -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn append_object_2() { let ctx = TestContext::new_from_env(); - if !ctx.client.is_minio_express() { + if !ctx.client.is_minio_express().await { println!("Skipping test because it is NOT running in MinIO Express mode"); return; } @@ -214,10 +214,10 @@ async fn append_object_2() { } /// Append beyond the size of an existing object (error InvalidWriteOffset) -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn append_object_3() { let ctx = TestContext::new_from_env(); - if !ctx.client.is_minio_express() { + if !ctx.client.is_minio_express().await { println!("Skipping test because it is NOT running in MinIO Express mode"); return; } @@ -250,10 +250,10 @@ async fn append_object_3() { } /// Append to the beginning/end of a non-existing object (happy flow) -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn append_object_4() { let ctx = TestContext::new_from_env(); - if !ctx.client.is_minio_express() { + if !ctx.client.is_minio_express().await { println!("Skipping test because it is NOT running in MinIO Express mode"); return; } @@ -301,10 +301,10 @@ async fn append_object_4() { } /// Append beyond the size of a non-existing object (error NoSuchKey) -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn append_object_5() { let ctx = TestContext::new_from_env(); - if !ctx.client.is_minio_express() { + if !ctx.client.is_minio_express().await { println!("Skipping test because it is NOT running in MinIO Express mode"); return; } @@ -331,10 +331,10 @@ async fn append_object_5() { } } -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn append_object_content_0() { let ctx = TestContext::new_from_env(); - if !ctx.client.is_minio_express() { + if !ctx.client.is_minio_express().await { println!("Skipping test because it is NOT running in MinIO Express mode"); return; } @@ -381,10 +381,10 @@ async fn append_object_content_0() { assert_eq!(resp.object_size, size * 2); } -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn append_object_content_1() { let ctx = TestContext::new_from_env(); - if !ctx.client.is_minio_express() { + if !ctx.client.is_minio_express().await { println!("Skipping test because it is NOT running in MinIO Express mode"); return; } @@ -433,10 +433,10 @@ async fn append_object_content_1() { } } -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn append_object_content_2() { let ctx = TestContext::new_from_env(); - if !ctx.client.is_minio_express() { + if !ctx.client.is_minio_express().await { println!("Skipping test because it is NOT running in MinIO Express mode"); return; } @@ -484,10 +484,10 @@ async fn append_object_content_2() { } /// Test sending AppendObject across async tasks. -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn append_object_content_3() { let ctx = TestContext::new_from_env(); - if !ctx.client.is_minio_express() { + if !ctx.client.is_minio_express().await { println!("Skipping test because it is NOT running in MinIO Express mode"); return; } diff --git a/tests/test_bucket_create_delete.rs b/tests/test_bucket_create_delete.rs index 27b8dc6..f3ef589 100644 --- a/tests/test_bucket_create_delete.rs +++ b/tests/test_bucket_create_delete.rs @@ -20,7 +20,7 @@ use minio::s3::types::S3Api; use minio_common::test_context::TestContext; use minio_common::utils::rand_bucket_name; -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn bucket_create() { let ctx = TestContext::new_from_env(); let bucket_name = rand_bucket_name(); @@ -48,7 +48,7 @@ async fn bucket_create() { } } -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn bucket_delete() { let ctx = TestContext::new_from_env(); let bucket_name = rand_bucket_name(); diff --git a/tests/test_bucket_encryption.rs b/tests/test_bucket_encryption.rs index 947cb30..6951d46 100644 --- a/tests/test_bucket_encryption.rs +++ b/tests/test_bucket_encryption.rs @@ -20,7 +20,7 @@ use minio::s3::response::{ use minio::s3::types::{S3Api, SseConfig}; use minio_common::test_context::TestContext; -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn bucket_encryption() { let ctx = TestContext::new_from_env(); let (bucket_name, _cleanup) = ctx.create_bucket_helper().await; diff --git a/tests/test_bucket_exists.rs b/tests/test_bucket_exists.rs index eb43800..e63872f 100644 --- a/tests/test_bucket_exists.rs +++ b/tests/test_bucket_exists.rs @@ -18,7 +18,7 @@ use minio::s3::response::{BucketExistsResponse, DeleteBucketResponse}; use minio::s3::types::S3Api; use minio_common::test_context::TestContext; -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn bucket_exists() { let ctx = TestContext::new_from_env(); let (bucket_name, _cleanup) = ctx.create_bucket_helper().await; diff --git a/tests/test_bucket_lifecycle.rs b/tests/test_bucket_lifecycle.rs index 6f4062f..844351e 100644 --- a/tests/test_bucket_lifecycle.rs +++ b/tests/test_bucket_lifecycle.rs @@ -23,7 +23,7 @@ use minio::s3::types::S3Api; use minio_common::example::create_bucket_lifecycle_config_examples; use minio_common::test_context::TestContext; -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn bucket_lifecycle() { let ctx = TestContext::new_from_env(); let (bucket_name, _cleanup) = ctx.create_bucket_helper().await; diff --git a/tests/test_bucket_notification.rs b/tests/test_bucket_notification.rs index 00d7b83..8298417 100644 --- a/tests/test_bucket_notification.rs +++ b/tests/test_bucket_notification.rs @@ -23,9 +23,14 @@ use minio_common::test_context::TestContext; const SQS_ARN: &str = "arn:minio:sqs::miniojavatest:webhook"; -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn test_bucket_notification() { let ctx = TestContext::new_from_env(); + if ctx.client.is_minio_express().await { + println!("Skipping test because it is running in MinIO Express mode"); + return; + } + let (bucket_name, _cleanup) = ctx.create_bucket_helper().await; let config: NotificationConfig = create_bucket_notification_config_example(); diff --git a/tests/test_bucket_policy.rs b/tests/test_bucket_policy.rs index 4db77c8..1f675f5 100644 --- a/tests/test_bucket_policy.rs +++ b/tests/test_bucket_policy.rs @@ -21,7 +21,7 @@ use minio::s3::types::S3Api; use minio_common::example::create_bucket_policy_config_example; use minio_common::test_context::TestContext; -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn bucket_policy() { let ctx = TestContext::new_from_env(); let (bucket_name, _cleanup) = ctx.create_bucket_helper().await; diff --git a/tests/test_bucket_replication.rs b/tests/test_bucket_replication.rs index 67a8eb1..0654e22 100644 --- a/tests/test_bucket_replication.rs +++ b/tests/test_bucket_replication.rs @@ -26,10 +26,10 @@ use minio_common::example::{ }; use minio_common::test_context::TestContext; -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn bucket_replication_s3() { let ctx = TestContext::new_from_env(); - if ctx.client.is_minio_express() { + if ctx.client.is_minio_express().await { println!("Skipping test because it is running in MinIO Express mode"); return; } @@ -134,11 +134,11 @@ async fn bucket_replication_s3() { //println!("response of getting replication: resp={:?}", resp); } -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn bucket_replication_s3express() { let ctx = TestContext::new_from_env(); - if !ctx.client.is_minio_express() { + if !ctx.client.is_minio_express().await { println!("Skipping test because it is NOT running in MinIO Express mode"); return; } diff --git a/tests/test_bucket_tags.rs b/tests/test_bucket_tags.rs index 3da68f9..54fa4ae 100644 --- a/tests/test_bucket_tags.rs +++ b/tests/test_bucket_tags.rs @@ -22,10 +22,10 @@ use minio::s3::types::S3Api; use minio_common::example::create_tags_example; use minio_common::test_context::TestContext; -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn bucket_tags_s3() { let ctx = TestContext::new_from_env(); - if ctx.client.is_minio_express() { + if ctx.client.is_minio_express().await { println!("Skipping test because it is running in MinIO Express mode"); return; } @@ -73,10 +73,10 @@ async fn bucket_tags_s3() { assert_eq!(resp.region, DEFAULT_REGION); } -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn bucket_tags_s3express() { let ctx = TestContext::new_from_env(); - if !ctx.client.is_minio_express() { + if !ctx.client.is_minio_express().await { println!("Skipping test because it is NOT running in MinIO Express mode"); return; } diff --git a/tests/test_bucket_versioning.rs b/tests/test_bucket_versioning.rs index 12251a0..84b11a6 100644 --- a/tests/test_bucket_versioning.rs +++ b/tests/test_bucket_versioning.rs @@ -20,10 +20,10 @@ use minio::s3::response::{GetBucketVersioningResponse, PutBucketVersioningRespon use minio::s3::types::S3Api; use minio_common::test_context::TestContext; -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn bucket_versioning_s3() { let ctx = TestContext::new_from_env(); - if ctx.client.is_minio_express() { + if ctx.client.is_minio_express().await { println!("Skipping test because it is running in MinIO Express mode"); return; } @@ -70,10 +70,10 @@ async fn bucket_versioning_s3() { assert_eq!(resp.region, DEFAULT_REGION); } -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn bucket_versioning_s3express() { let ctx = TestContext::new_from_env(); - if !ctx.client.is_minio_express() { + if !ctx.client.is_minio_express().await { println!("Skipping test because it is NOT running in MinIO Express mode"); return; } diff --git a/tests/test_get_object.rs b/tests/test_get_object.rs index 1a3cfae..93c05c2 100644 --- a/tests/test_get_object.rs +++ b/tests/test_get_object.rs @@ -19,7 +19,7 @@ use minio::s3::types::S3Api; use minio_common::test_context::TestContext; use minio_common::utils::rand_object_name; -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn get_object() { let ctx = TestContext::new_from_env(); let (bucket_name, _cleanup) = ctx.create_bucket_helper().await; diff --git a/tests/test_get_presigned_object_url.rs b/tests/test_get_presigned_object_url.rs index 0f1d059..3f8c50d 100644 --- a/tests/test_get_presigned_object_url.rs +++ b/tests/test_get_presigned_object_url.rs @@ -19,7 +19,7 @@ use minio::s3::response::GetPresignedObjectUrlResponse; use minio_common::test_context::TestContext; use minio_common::utils::rand_object_name; -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn get_presigned_object_url() { let ctx = TestContext::new_from_env(); let (bucket_name, _cleanup) = ctx.create_bucket_helper().await; diff --git a/tests/test_get_presigned_post_form_data.rs b/tests/test_get_presigned_post_form_data.rs index 70008dc..41544d3 100644 --- a/tests/test_get_presigned_post_form_data.rs +++ b/tests/test_get_presigned_post_form_data.rs @@ -19,7 +19,7 @@ use minio_common::test_context::TestContext; use minio_common::utils::rand_object_name; use std::collections::HashMap; -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn get_presigned_post_form_data() { let ctx = TestContext::new_from_env(); let (bucket_name, _cleanup) = ctx.create_bucket_helper().await; diff --git a/tests/test_list_buckets.rs b/tests/test_list_buckets.rs index c93f468..b90f084 100644 --- a/tests/test_list_buckets.rs +++ b/tests/test_list_buckets.rs @@ -18,7 +18,7 @@ use minio::s3::types::S3Api; use minio_common::cleanup_guard::CleanupGuard; use minio_common::test_context::TestContext; -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn list_buckets() { const N_BUCKETS: usize = 3; let ctx = TestContext::new_from_env(); diff --git a/tests/test_list_objects.rs b/tests/test_list_objects.rs index a470b23..deeab65 100644 --- a/tests/test_list_objects.rs +++ b/tests/test_list_objects.rs @@ -13,12 +13,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use async_std::stream::StreamExt; use minio::s3::response::PutObjectContentResponse; use minio::s3::types::ToStream; use minio_common::test_context::TestContext; use minio_common::utils::rand_object_name; use std::collections::HashSet; -use tokio_stream::StreamExt; async fn list_objects( use_api_v1: bool, @@ -37,7 +37,7 @@ async fn list_objects( } let ctx = TestContext::new_from_env(); - let is_express = ctx.client.is_minio_express(); + let is_express = ctx.client.is_minio_express().await; if is_express && !express { println!("Skipping test because it is running in MinIO Express mode"); return; @@ -97,29 +97,29 @@ async fn list_objects( assert_eq!(names_set_after, names_set_before); } -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn list_objects_v1_no_versions() { list_objects(true, false, false, 5, 5).await; } -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn list_objects_v1_with_versions() { list_objects(true, true, false, 5, 5).await; } -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn list_objects_v2_no_versions() { list_objects(false, false, false, 5, 5).await; } -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn list_objects_v2_with_versions() { list_objects(false, true, false, 5, 5).await; } /// Test for S3-Express: List objects with S3-Express are only supported with V2 API, without /// versions, and yield unsorted results. -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn list_objects_express() { list_objects(false, false, true, 5, 5).await; } diff --git a/tests/test_listen_bucket_notification.rs b/tests/test_listen_bucket_notification.rs index b948cc0..b8c838b 100644 --- a/tests/test_listen_bucket_notification.rs +++ b/tests/test_listen_bucket_notification.rs @@ -13,6 +13,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use async_std::stream::StreamExt; use async_std::task; use minio::s3::builders::ObjectContent; use minio::s3::response::PutObjectContentResponse; @@ -21,9 +22,8 @@ use minio_common::rand_src::RandSrc; use minio_common::test_context::TestContext; use minio_common::utils::rand_object_name; use tokio::sync::mpsc; -use tokio_stream::StreamExt; -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn listen_bucket_notification() { let ctx = TestContext::new_from_env(); let (bucket_name, _cleanup) = ctx.create_bucket_helper().await; diff --git a/tests/test_object_compose.rs b/tests/test_object_compose.rs index 8aaba4b..5604e4e 100644 --- a/tests/test_object_compose.rs +++ b/tests/test_object_compose.rs @@ -20,7 +20,7 @@ use minio_common::rand_src::RandSrc; use minio_common::test_context::TestContext; use minio_common::utils::rand_object_name; -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn compose_object() { let ctx = TestContext::new_from_env(); let (bucket_name, _cleanup) = ctx.create_bucket_helper().await; diff --git a/tests/test_object_copy.rs b/tests/test_object_copy.rs index bd2800a..b2757b9 100644 --- a/tests/test_object_copy.rs +++ b/tests/test_object_copy.rs @@ -20,10 +20,10 @@ use minio_common::rand_src::RandSrc; use minio_common::test_context::TestContext; use minio_common::utils::rand_object_name; -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn copy_object() { let ctx = TestContext::new_from_env(); - if ctx.client.is_minio_express() { + if ctx.client.is_minio_express().await { println!("Skipping test because it is running in MinIO Express mode"); return; } diff --git a/tests/test_object_legal_hold.rs b/tests/test_object_legal_hold.rs index d0b75c7..2fe0862 100644 --- a/tests/test_object_legal_hold.rs +++ b/tests/test_object_legal_hold.rs @@ -25,10 +25,10 @@ use minio_common::cleanup_guard::CleanupGuard; use minio_common::test_context::TestContext; use minio_common::utils::{rand_bucket_name, rand_object_name}; -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn object_legal_hold_s3() { let ctx = TestContext::new_from_env(); - if ctx.client.is_minio_express() { + if ctx.client.is_minio_express().await { println!("Skipping test because it is running in MinIO Express mode"); return; } diff --git a/tests/test_object_lock_config.rs b/tests/test_object_lock_config.rs index 1725a51..b42b324 100644 --- a/tests/test_object_lock_config.rs +++ b/tests/test_object_lock_config.rs @@ -22,10 +22,10 @@ use minio_common::cleanup_guard::CleanupGuard; use minio_common::test_context::TestContext; use minio_common::utils::rand_bucket_name; -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn object_lock_config() { let ctx = TestContext::new_from_env(); - if ctx.client.is_minio_express() { + if ctx.client.is_minio_express().await { println!("Skipping test because it is running in MinIO Express mode"); return; } diff --git a/tests/test_object_put.rs b/tests/test_object_put.rs index 924f65b..e24ebf1 100644 --- a/tests/test_object_put.rs +++ b/tests/test_object_put.rs @@ -22,7 +22,7 @@ use minio_common::test_context::TestContext; use minio_common::utils::rand_object_name; use tokio::sync::mpsc; -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn put_object() { let ctx = TestContext::new_from_env(); let (bucket_name, _cleanup) = ctx.create_bucket_helper().await; @@ -53,7 +53,7 @@ async fn put_object() { assert_eq!(resp.size, size); } -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn put_object_multipart() { let ctx = TestContext::new_from_env(); let (bucket_name, _cleanup) = ctx.create_bucket_helper().await; @@ -81,7 +81,7 @@ async fn put_object_multipart() { assert_eq!(resp.size as u64, size); } -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn put_object_content() { let ctx = TestContext::new_from_env(); let (bucket_name, _cleanup) = ctx.create_bucket_helper().await; @@ -146,7 +146,7 @@ async fn put_object_content() { } /// Test sending ObjectContent across async tasks. -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn put_object_content_2() { let ctx = TestContext::new_from_env(); let (bucket_name, _cleanup) = ctx.create_bucket_helper().await; diff --git a/tests/test_object_remove.rs b/tests/test_object_remove.rs index cc7c820..8efb3ba 100644 --- a/tests/test_object_remove.rs +++ b/tests/test_object_remove.rs @@ -13,14 +13,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use async_std::stream::StreamExt; use minio::s3::builders::ObjectToDelete; use minio::s3::response::PutObjectContentResponse; use minio::s3::types::ToStream; use minio_common::test_context::TestContext; use minio_common::utils::rand_object_name; -use tokio_stream::StreamExt; -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn remove_objects() { let ctx = TestContext::new_from_env(); let (bucket_name, _cleanup) = ctx.create_bucket_helper().await; diff --git a/tests/test_object_retention.rs b/tests/test_object_retention.rs index cc55101..a551d03 100644 --- a/tests/test_object_retention.rs +++ b/tests/test_object_retention.rs @@ -26,10 +26,10 @@ use minio_common::rand_src::RandSrc; use minio_common::test_context::TestContext; use minio_common::utils::{rand_bucket_name, rand_object_name}; -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn object_retention() { let ctx = TestContext::new_from_env(); - if ctx.client.is_minio_express() { + if ctx.client.is_minio_express().await { println!("Skipping test because it is running in MinIO Express mode"); return; } @@ -66,7 +66,7 @@ async fn object_retention() { //assert_eq!(resp.etag, ""); let retain_until_date = utc_now() + chrono::Duration::days(1); - let obj_resp: PutObjectRetentionResponse = ctx + let resp: PutObjectRetentionResponse = ctx .client .put_object_retention(&bucket_name, &object_name) .retention_mode(Some(RetentionMode::GOVERNANCE)) @@ -74,10 +74,10 @@ async fn object_retention() { .send() .await .unwrap(); - assert_eq!(obj_resp.bucket, bucket_name); - assert_eq!(obj_resp.object, object_name); - assert_eq!(obj_resp.version_id, None); - assert_eq!(obj_resp.region, DEFAULT_REGION); + assert_eq!(resp.bucket, bucket_name); + assert_eq!(resp.object, object_name); + assert_eq!(resp.version_id, None); + assert_eq!(resp.region, DEFAULT_REGION); let resp: GetObjectRetentionResponse = ctx .client diff --git a/tests/test_object_tags.rs b/tests/test_object_tags.rs index 70a5f89..9dfb778 100644 --- a/tests/test_object_tags.rs +++ b/tests/test_object_tags.rs @@ -25,10 +25,10 @@ use minio_common::test_context::TestContext; use minio_common::utils::rand_object_name; use std::collections::HashMap; -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn object_tags() { let ctx = TestContext::new_from_env(); - if ctx.client.is_minio_express() { + if ctx.client.is_minio_express().await { println!("Skipping test because it is running in MinIO Express mode"); return; } diff --git a/tests/test_put_object.rs b/tests/test_put_object.rs index 298edaa..ed48377 100644 --- a/tests/test_put_object.rs +++ b/tests/test_put_object.rs @@ -24,7 +24,7 @@ use minio_common::test_context::TestContext; use minio_common::utils::rand_object_name; use tokio::sync::mpsc; -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn put_object() { let ctx = TestContext::new_from_env(); let (bucket_name, _cleanup) = ctx.create_bucket_helper().await; @@ -78,7 +78,7 @@ async fn put_object() { } } -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn put_object_multipart() { let ctx = TestContext::new_from_env(); let (bucket_name, _cleanup) = ctx.create_bucket_helper().await; @@ -119,7 +119,7 @@ async fn put_object_multipart() { assert_eq!(resp.version_id, None); } -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn put_object_content_1() { let ctx = TestContext::new_from_env(); let (bucket_name, _cleanup) = ctx.create_bucket_helper().await; @@ -162,7 +162,7 @@ async fn put_object_content_1() { } } -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn put_object_content_2() { let ctx = TestContext::new_from_env(); let (bucket_name, _cleanup) = ctx.create_bucket_helper().await; @@ -202,7 +202,7 @@ async fn put_object_content_2() { } /// Test sending PutObject across async tasks. -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn put_object_content_3() { let ctx = TestContext::new_from_env(); let (bucket_name, _cleanup) = ctx.create_bucket_helper().await; diff --git a/tests/test_select_object_content.rs b/tests/test_select_object_content.rs index 5d1b196..af9039f 100644 --- a/tests/test_select_object_content.rs +++ b/tests/test_select_object_content.rs @@ -20,10 +20,10 @@ use minio_common::example::{create_select_content_data, create_select_content_re use minio_common::test_context::TestContext; use minio_common::utils::rand_object_name; -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn select_object_content_s3() { let ctx = TestContext::new_from_env(); - if ctx.client.is_minio_express() { + if ctx.client.is_minio_express().await { println!("Skipping test because it is running in MinIO Express mode"); return; } @@ -61,10 +61,10 @@ async fn select_object_content_s3() { assert_eq!(got, select_data); } -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn select_object_content_express() { let ctx = TestContext::new_from_env(); - if !ctx.client.is_minio_express() { + if !ctx.client.is_minio_express().await { println!("Skipping test because it is NOT running in MinIO Express mode"); return; } diff --git a/tests/test_upload_download_object.rs b/tests/test_upload_download_object.rs index 7c83ef4..9422d3e 100644 --- a/tests/test_upload_download_object.rs +++ b/tests/test_upload_download_object.rs @@ -90,12 +90,12 @@ async fn upload_download_object(size: u64) { fs::remove_file(&filename).unwrap(); } -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn upload_download_object_1() { upload_download_object(16).await; } -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread")] async fn upload_download_object_2() { upload_download_object(16 + 5 * 1024 * 1024).await; }