diff --git a/src/s3/args.rs b/src/s3/args.rs index 5cfb52f..fa78eb3 100644 --- a/src/s3/args.rs +++ b/src/s3/args.rs @@ -1054,43 +1054,6 @@ impl<'a> SelectObjectContentArgs<'a> { } } -/// Argument for [listen_bucket_notification()](crate::s3::client::Client::listen_bucket_notification) API -#[derive(Clone, Debug)] -pub struct ListenBucketNotificationArgs { - pub extra_headers: Option, - pub extra_query_params: Option, - pub region: Option, - pub bucket: String, - pub prefix: Option, - pub suffix: Option, - pub events: Option>, -} - -impl ListenBucketNotificationArgs { - /// Returns argument for [listen_bucket_notification()](crate::s3::client::Client::listen_bucket_notification) API with given bucket name and callback function for results. - /// - /// # Examples - /// - /// ``` - /// use minio::s3::args::*; - /// use minio::s3::types::NotificationRecords; - /// - /// let args = ListenBucketNotificationArgs::new("my-bucket").unwrap(); - /// ``` - pub fn new(bucket_name: &str) -> Result { - check_bucket_name(bucket_name, true)?; - Ok(ListenBucketNotificationArgs { - extra_headers: None, - extra_query_params: None, - region: None, - bucket: bucket_name.to_owned(), - prefix: None, - suffix: None, - events: None, - }) - } -} - #[derive(Clone, Debug, Default)] /// Argument for [upload_part_copy()](crate::s3::client::Client::upload_part_copy) S3 API pub struct UploadPartCopyArgs<'a> { diff --git a/src/s3/builders.rs b/src/s3/builders.rs index 079e7ae..b4e9df1 100644 --- a/src/s3/builders.rs +++ b/src/s3/builders.rs @@ -13,5 +13,7 @@ //! Argument builders for [minio::s3::client::Client](crate::s3::client::Client) APIs mod list_objects; +mod listen_bucket_notification; pub use list_objects::*; +pub use listen_bucket_notification::*; diff --git a/src/s3/builders/listen_bucket_notification.rs b/src/s3/builders/listen_bucket_notification.rs new file mode 100644 index 0000000..dec20bd --- /dev/null +++ b/src/s3/builders/listen_bucket_notification.rs @@ -0,0 +1,139 @@ +// MinIO Rust Library for Amazon S3 Compatible Cloud Storage +// Copyright 2023 MinIO, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use async_trait::async_trait; +use futures_util::Stream; +use http::Method; + +use crate::s3::{ + client::Client, + error::Error, + response::ListenBucketNotificationResponse, + types::{NotificationRecords, S3Api, S3Request, ToS3Request}, + utils::{check_bucket_name, merge, Multimap}, +}; + +/// Argument builder for +/// [listen_bucket_notification()](crate::s3::client::Client::listen_bucket_notification) +/// API. +#[derive(Clone, Debug, Default)] +pub struct ListenBucketNotification { + client: Option, + + extra_headers: Option, + extra_query_params: Option, + region: Option, + bucket: String, + prefix: Option, + suffix: Option, + events: Option>, +} + +#[async_trait] +impl S3Api for ListenBucketNotification { + type S3Response = ( + ListenBucketNotificationResponse, + Box> + Unpin + Send>, + ); +} + +impl ToS3Request for ListenBucketNotification { + fn to_s3request(&self) -> Result { + let client = self.client.as_ref().ok_or(Error::NoClientProvided)?; + if client.is_aws_host() { + return Err(Error::UnsupportedApi(String::from( + "ListenBucketNotification", + ))); + } + + check_bucket_name(&self.bucket, true)?; + + let mut headers = Multimap::new(); + if let Some(v) = &self.extra_headers { + merge(&mut headers, v); + } + + let mut query_params = Multimap::new(); + if let Some(v) = &self.extra_query_params { + merge(&mut query_params, v); + } + if let Some(v) = &self.prefix { + query_params.insert(String::from("prefix"), v.to_string()); + } + if let Some(v) = &self.suffix { + query_params.insert(String::from("suffix"), v.to_string()); + } + if let Some(v) = &self.events { + for e in v.iter() { + query_params.insert(String::from("events"), e.to_string()); + } + } else { + query_params.insert(String::from("events"), String::from("s3:ObjectCreated:*")); + query_params.insert(String::from("events"), String::from("s3:ObjectRemoved:*")); + query_params.insert(String::from("events"), String::from("s3:ObjectAccessed:*")); + } + + let req = S3Request::new(client, Method::GET) + .region(self.region.as_deref()) + .bucket(Some(&self.bucket)) + .query_params(query_params) + .headers(headers); + Ok(req) + } +} + +impl ListenBucketNotification { + pub fn new(bucket_name: &str) -> ListenBucketNotification { + ListenBucketNotification { + bucket: bucket_name.to_owned(), + ..Default::default() + } + } + + pub fn client(mut self, client: &Client) -> Self { + self.client = Some(client.clone()); + self + } + + pub fn extra_headers(mut self, extra_headers: Option) -> Self { + self.extra_headers = extra_headers; + self + } + + pub fn extra_query_params(mut self, extra_query_params: Option) -> Self { + self.extra_query_params = extra_query_params; + self + } + + pub fn region(mut self, region: Option) -> Self { + self.region = region; + self + } + + pub fn prefix(mut self, prefix: Option) -> Self { + self.prefix = prefix; + self + } + + pub fn suffix(mut self, suffix: Option) -> Self { + self.suffix = suffix; + self + } + + pub fn events(mut self, events: Option>) -> Self { + self.events = events; + self + } +} diff --git a/src/s3/client.rs b/src/s3/client.rs index 85bc162..febd930 100644 --- a/src/s3/client.rs +++ b/src/s3/client.rs @@ -174,6 +174,10 @@ impl Client { .build() } + pub fn is_aws_host(&self) -> bool { + self.base_url.is_aws_host() + } + fn build_headers( &self, headers: &mut Multimap, diff --git a/src/s3/client/listen_bucket_notification.rs b/src/s3/client/listen_bucket_notification.rs index e58c800..2c91d4b 100644 --- a/src/s3/client/listen_bucket_notification.rs +++ b/src/s3/client/listen_bucket_notification.rs @@ -15,19 +15,7 @@ //! MinIO Extension API for S3 Buckets: ListenBucketNotification -use futures_util::stream; -use http::Method; -use tokio::io::AsyncBufReadExt; -use tokio_stream::{Stream, StreamExt}; -use tokio_util::io::StreamReader; - -use crate::s3::{ - args::ListenBucketNotificationArgs, - error::Error, - response::ListenBucketNotificationResponse, - types::NotificationRecords, - utils::{merge, Multimap}, -}; +use crate::s3::builders::ListenBucketNotification; use super::Client; @@ -38,97 +26,7 @@ impl Client { /// returned by the server and the latter is a stream of notification /// records. In normal operation (when there are no errors), the stream /// never ends. - pub async fn listen_bucket_notification( - &self, - args: ListenBucketNotificationArgs, - ) -> Result< - ( - ListenBucketNotificationResponse, - impl Stream>, - ), - Error, - > { - if self.base_url.is_aws_host() { - return Err(Error::UnsupportedApi(String::from( - "ListenBucketNotification", - ))); - } - - let region = self - .get_region(&args.bucket, args.region.as_deref()) - .await?; - - let mut headers = Multimap::new(); - if let Some(v) = &args.extra_headers { - merge(&mut headers, v); - } - - let mut query_params = Multimap::new(); - if let Some(v) = &args.extra_query_params { - merge(&mut query_params, v); - } - if let Some(v) = args.prefix { - query_params.insert(String::from("prefix"), v.to_string()); - } - if let Some(v) = args.suffix { - query_params.insert(String::from("suffix"), v.to_string()); - } - if let Some(v) = &args.events { - for e in v.iter() { - query_params.insert(String::from("events"), e.to_string()); - } - } else { - query_params.insert(String::from("events"), String::from("s3:ObjectCreated:*")); - query_params.insert(String::from("events"), String::from("s3:ObjectRemoved:*")); - query_params.insert(String::from("events"), String::from("s3:ObjectAccessed:*")); - } - - let resp = self - .execute( - Method::GET, - ®ion, - &mut headers, - &query_params, - Some(&args.bucket), - None, - None, - ) - .await?; - - let header_map = resp.headers().clone(); - - let body_stream = resp.bytes_stream(); - let body_stream = body_stream - .map(|r| r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))); - let stream_reader = StreamReader::new(body_stream); - - let record_stream = Box::pin(stream::unfold( - stream_reader, - move |mut reader| async move { - loop { - let mut line = String::new(); - match reader.read_line(&mut line).await { - Ok(n) => { - if n == 0 { - return None; - } - let s = line.trim(); - if s.is_empty() { - continue; - } - let records_res: Result = - serde_json::from_str(s).map_err(|e| e.into()); - return Some((records_res, reader)); - } - Err(e) => return Some((Err(e.into()), reader)), - } - } - }, - )); - - Ok(( - ListenBucketNotificationResponse::new(header_map, ®ion, &args.bucket), - record_stream, - )) + pub fn listen_bucket_notification(&self, bucket: &str) -> ListenBucketNotification { + ListenBucketNotification::new(bucket).client(self) } } diff --git a/src/s3/response.rs b/src/s3/response.rs index 3e40123..e387eba 100644 --- a/src/s3/response.rs +++ b/src/s3/response.rs @@ -32,10 +32,12 @@ use crate::s3::utils::{ }; mod list_objects; +mod listen_bucket_notification; pub use list_objects::{ ListObjectVersionsResponse, ListObjectsResponse, ListObjectsV1Response, ListObjectsV2Response, }; +pub use listen_bucket_notification::ListenBucketNotificationResponse; #[derive(Debug)] /// Response of [list_buckets()](crate::s3::client::Client::list_buckets) API @@ -566,28 +568,6 @@ impl SelectObjectContentResponse { } } -#[derive(Clone, Debug)] -/// Response of [listen_bucket_notification()](crate::s3::client::Client::listen_bucket_notification) API -pub struct ListenBucketNotificationResponse { - pub headers: HeaderMap, - pub region: String, - pub bucket_name: String, -} - -impl ListenBucketNotificationResponse { - pub fn new( - headers: HeaderMap, - region: &str, - bucket_name: &str, - ) -> ListenBucketNotificationResponse { - ListenBucketNotificationResponse { - headers, - region: region.to_string(), - bucket_name: bucket_name.to_string(), - } - } -} - /// Response of [delete_bucket_encryption()](crate::s3::client::Client::delete_bucket_encryption) API pub type DeleteBucketEncryptionResponse = BucketResponse; diff --git a/src/s3/response/listen_bucket_notification.rs b/src/s3/response/listen_bucket_notification.rs new file mode 100644 index 0000000..92aeaab --- /dev/null +++ b/src/s3/response/listen_bucket_notification.rs @@ -0,0 +1,87 @@ +// MinIO Rust Library for Amazon S3 Compatible Cloud Storage +// Copyright 2023 MinIO, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use futures_util::{stream, Stream, StreamExt}; +use http::HeaderMap; +use tokio::io::AsyncBufReadExt; +use tokio_util::io::StreamReader; + +use crate::s3::{ + error::Error, + types::{FromS3Response, NotificationRecords, S3Request}, +}; + +/// Response of +/// [listen_bucket_notification()](crate::s3::client::Client::listen_bucket_notification) +/// API +#[derive(Debug)] +pub struct ListenBucketNotificationResponse { + pub headers: HeaderMap, + pub region: String, + pub bucket: String, +} + +#[async_trait::async_trait] +impl FromS3Response + for ( + ListenBucketNotificationResponse, + Box> + Unpin + Send>, + ) +{ + async fn from_s3response<'a>( + req: S3Request<'a>, + resp: reqwest::Response, + ) -> Result { + let headers = resp.headers().clone(); + + let body_stream = resp.bytes_stream(); + let body_stream = body_stream + .map(|r| r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))); + let stream_reader = StreamReader::new(body_stream); + + let record_stream = Box::pin(stream::unfold( + stream_reader, + move |mut reader| async move { + loop { + let mut line = String::new(); + match reader.read_line(&mut line).await { + Ok(n) => { + if n == 0 { + return None; + } + let s = line.trim(); + if s.is_empty() { + continue; + } + let records_res: Result = + serde_json::from_str(&s).map_err(|e| e.into()); + return Some((records_res, reader)); + } + Err(e) => return Some((Err(e.into()), reader)), + } + } + }, + )); + + Ok(( + ListenBucketNotificationResponse { + headers, + region: req.get_computed_region(), + bucket: req.bucket.unwrap().to_string(), + }, + Box::new(record_stream), + )) + } +} diff --git a/src/s3/types.rs b/src/s3/types.rs index 34eaa33..6ab1ff2 100644 --- a/src/s3/types.rs +++ b/src/s3/types.rs @@ -33,13 +33,13 @@ use std::fmt; pub struct S3Request<'a> { client: &'a Client, - method: Method, - region: Option<&'a str>, - bucket: Option<&'a str>, - object: Option<&'a str>, - query_params: Multimap, - headers: Multimap, - body: Option>, + pub method: Method, + pub region: Option<&'a str>, + pub bucket: Option<&'a str>, + pub object: Option<&'a str>, + pub query_params: Multimap, + pub headers: Multimap, + pub body: Option>, // Computed region inner_region: String, @@ -90,6 +90,10 @@ impl<'a> S3Request<'a> { self } + pub fn get_computed_region(&self) -> String { + self.inner_region.clone() + } + pub async fn execute(&mut self) -> Result { // Lookup the region of the bucket if provided. self.inner_region = if let Some(bucket) = self.bucket { @@ -592,126 +596,165 @@ impl<'a> SelectRequest<'a> { } } -#[derive(Clone, Debug)] /// Progress information of [select_object_content()](crate::s3::client::Client::select_object_content) API +#[derive(Clone, Debug)] pub struct SelectProgress { pub bytes_scanned: usize, pub bytes_progressed: usize, pub bytes_returned: usize, } -#[derive(Debug, Deserialize, Serialize)] /// User identity contains principal ID +#[derive(Debug, Deserialize, Serialize, Clone, Default)] pub struct UserIdentity { - #[serde(alias = "principalId")] - pub principal_id: Option, + #[serde(alias = "principalId", default)] + pub principal_id: String, } /// Owner identity contains principal ID pub type OwnerIdentity = UserIdentity; -#[derive(Debug, Deserialize, Serialize)] -/// Request parameters contain principal ID, region and source IP address -pub struct RequestParameters { - #[serde(alias = "principalId")] - pub principal_id: Option, - #[serde(alias = "region")] - pub region: Option, - #[serde(alias = "sourceIPAddress")] - pub source_ip_address: Option, +/// Request parameters contain principal ID, region and source IP address, but +/// they are represented as a string-to-string map in the MinIO server. So we +/// provide methods to fetch the known fields and a map for underlying +/// representation. +#[derive(Debug, Deserialize, Serialize, Clone, Default)] +pub struct RequestParameters(HashMap); + +impl RequestParameters { + pub fn principal_id(&self) -> Option<&String> { + self.0.get("principalId") + } + + pub fn region(&self) -> Option<&String> { + self.0.get("region") + } + + pub fn source_ip_address(&self) -> Option<&String> { + self.0.get("sourceIPAddress") + } + + pub fn get_map(&self) -> &HashMap { + &self.0 + } } -#[derive(Debug, Deserialize, Serialize)] -/// Response elements information -pub struct ResponseElements { - #[serde(alias = "content-length")] - pub content_length: Option, - #[serde(alias = "x-amz-request-id")] - pub x_amz_request_id: Option, - #[serde(alias = "x-minio-deployment-id")] - pub x_minio_deployment_id: Option, - #[serde(alias = "x-minio-origin-endpoint")] - pub x_minio_origin_endpoint: Option, +#[derive(Debug, Deserialize, Serialize, Clone, Default)] +/// Response elements information: they are represented as a string-to-string +/// map in the MinIO server. So we provide methods to fetch the known fields and +/// a map for underlying representation. +pub struct ResponseElements(HashMap); + +impl ResponseElements { + pub fn content_length(&self) -> Option<&String> { + self.0.get("content-length") + } + + pub fn x_amz_request_id(&self) -> Option<&String> { + self.0.get("x-amz-request-id") + } + + pub fn x_minio_deployment_id(&self) -> Option<&String> { + self.0.get("x-minio-deployment-id") + } + + pub fn x_amz_id_2(&self) -> Option<&String> { + self.0.get("x-amz-id-2") + } + + pub fn x_minio_origin_endpoint(&self) -> Option<&String> { + self.0.get("x-minio-origin-endpoint") + } + + pub fn get_map(&self) -> &HashMap { + &self.0 + } } -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Deserialize, Serialize, Clone, Default)] /// S3 bucket information pub struct S3Bucket { - #[serde(alias = "name")] - pub name: Option, - #[serde(alias = "arn")] - pub arn: Option, - #[serde(alias = "ownerIdentity")] - pub owner_identity: Option, + #[serde(alias = "name", default)] + pub name: String, + #[serde(alias = "arn", default)] + pub arn: String, + #[serde(alias = "ownerIdentity", default)] + pub owner_identity: OwnerIdentity, } -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Deserialize, Serialize, Clone, Default)] /// S3 object information pub struct S3Object { - #[serde(alias = "key")] - pub key: Option, + #[serde(alias = "key", default)] + pub key: String, #[serde(alias = "size")] - pub size: Option, + pub size: Option, #[serde(alias = "eTag")] pub etag: Option, #[serde(alias = "contentType")] pub content_type: Option, #[serde(alias = "userMetadata")] pub user_metadata: Option>, - #[serde(alias = "sequencer")] - pub sequencer: Option, + #[serde(alias = "versionId", default)] + pub version_id: String, + #[serde(alias = "sequencer", default)] + pub sequencer: String, } -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Deserialize, Serialize, Clone, Default)] /// S3 definitions for NotificationRecord pub struct S3 { - #[serde(alias = "s3SchemaVersion")] - pub s3_schema_version: Option, - #[serde(alias = "configurationId")] - pub configuration_id: Option, - #[serde(alias = "bucket")] - pub bucket: Option, - #[serde(alias = "object")] - pub object: Option, + #[serde(alias = "s3SchemaVersion", default)] + pub s3_schema_version: String, + #[serde(alias = "configurationId", default)] + pub configuration_id: String, + #[serde(alias = "bucket", default)] + pub bucket: S3Bucket, + #[serde(alias = "object", default)] + pub object: S3Object, } -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Deserialize, Serialize, Clone, Default)] /// Source information pub struct Source { - #[serde(alias = "host")] - pub host: Option, + #[serde(alias = "host", default)] + pub host: String, #[serde(alias = "port")] pub port: Option, - #[serde(alias = "userAgent")] - pub user_agent: Option, + #[serde(alias = "userAgent", default)] + pub user_agent: String, } -#[derive(Debug, Deserialize, Serialize)] /// Notification record information +#[derive(Debug, Deserialize, Serialize, Clone)] pub struct NotificationRecord { - #[serde(alias = "eventVersion")] - pub event_version: Option, - #[serde(alias = "eventSource")] - pub event_source: Option, - #[serde(alias = "awsRegion")] - pub aws_region: Option, - #[serde(alias = "eventTime")] - pub event_time: Option, - #[serde(alias = "eventName")] - pub event_name: Option, - #[serde(alias = "userIdentity")] - pub user_identity: Option, - #[serde(alias = "requestParameters")] - pub request_parameters: Option, - #[serde(alias = "responseElements")] - pub response_elements: Option, - #[serde(alias = "s3")] - pub s3: Option, - #[serde(alias = "source")] - pub source: Option, + #[serde(alias = "eventVersion", default)] + pub event_version: String, + #[serde(alias = "eventSource", default)] + pub event_source: String, + #[serde(alias = "awsRegion", default)] + pub aws_region: String, + #[serde( + alias = "eventTime", + default, + with = "crate::s3::utils::aws_date_format" + )] + pub event_time: UtcTime, + #[serde(alias = "eventName", default)] + pub event_name: String, + #[serde(alias = "userIdentity", default)] + pub user_identity: UserIdentity, + #[serde(alias = "requestParameters", default)] + pub request_parameters: RequestParameters, + #[serde(alias = "responseElements", default)] + pub response_elements: ResponseElements, + #[serde(alias = "s3", default)] + pub s3: S3, + #[serde(alias = "source", default)] + pub source: Source, } -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Deserialize, Serialize, Clone)] /// Contains notification records pub struct NotificationRecords { #[serde(alias = "Records")] diff --git a/src/s3/utils.rs b/src/s3/utils.rs index 26bda77..088b28d 100644 --- a/src/s3/utils.rs +++ b/src/s3/utils.rs @@ -129,6 +129,26 @@ pub fn from_iso8601utc(s: &str) -> Result { )) } +pub mod aws_date_format { + use super::{from_iso8601utc, to_iso8601utc, UtcTime}; + use serde::{Deserialize, Deserializer, Serializer}; + + pub fn serialize(date: &UtcTime, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(&to_iso8601utc(date.clone())) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + Ok(from_iso8601utc(&s).map_err(serde::de::Error::custom)?) + } +} + /// Parses HTTP header value to time pub fn from_http_header_value(s: &str) -> Result { Ok(DateTime::::from_naive_utc_and_offset( diff --git a/tests/tests.rs b/tests/tests.rs index 18ff2af..5c9b20d 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -30,13 +30,13 @@ use minio::s3::args::*; use minio::s3::client::Client; use minio::s3::creds::StaticProvider; use minio::s3::http::BaseUrl; -use minio::s3::types::NotificationRecords; use minio::s3::types::ToStream; use minio::s3::types::{ CsvInputSerialization, CsvOutputSerialization, DeleteObject, FileHeaderInfo, NotificationConfig, ObjectLockConfig, PrefixFilterRule, QueueConfig, QuoteFields, RetentionMode, SelectRequest, SuffixFilterRule, }; +use minio::s3::types::{NotificationRecords, S3Api}; use minio::s3::utils::{to_iso8601utc, utc_now}; struct RandReader { @@ -554,24 +554,23 @@ impl ClientTest { .unwrap(); let event_fn = |event: NotificationRecords| { - for record in event.records.iter() { - if let Some(s3) = &record.s3 { - if let Some(object) = &s3.object { - if let Some(key) = &object.key { - if name == *key { - sender.send(true).unwrap(); - } - return false; - } - } + let record = event.records.iter().next(); + if let Some(record) = record { + let key = &record.s3.object.key; + if name == *key { + sender.send(true).unwrap(); + return false; } } sender.send(false).unwrap(); false }; - let args = ListenBucketNotificationArgs::new(&test_bucket).unwrap(); - let (_, mut event_stream) = client.listen_bucket_notification(args).await.unwrap(); + let (_, mut event_stream) = client + .listen_bucket_notification(&test_bucket) + .send() + .await + .unwrap(); while let Some(event) = event_stream.next().await { let event = event.unwrap(); if !event_fn(event) {