From e3a5729f0bd6383d35ace26f551a40b8f73032e0 Mon Sep 17 00:00:00 2001 From: theMackabu <theMackabu@gmail.com> Date: Fri, 1 Dec 2023 22:09:32 -0800 Subject: [PATCH] finish base rewrite --- Cargo.lock | 5 +- Cargo.toml | 2 +- Maidfile.toml | 6 +- crates/maid/client/Cargo.toml | 3 +- crates/maid/client/src/helpers/logger.rs | 49 +-- crates/maid/client/src/server/cli.rs | 46 +-- crates/maid/client/src/structs.rs | 27 +- crates/maid/server/src/docker/run.rs | 443 +++++++++++------------ crates/maid/server/src/main.rs | 140 ++++--- 9 files changed, 385 insertions(+), 336 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 94a8834..08d7522 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1293,7 +1293,7 @@ checksum = "973fff3a34801ae9c15ba9ef09e82b1e37e2fdcb554d3e5f05d38ab2896dd383" [[package]] name = "maid" -version = "1.1.0" +version = "1.2.0" dependencies = [ "anyhow", "chrono", @@ -1310,6 +1310,7 @@ dependencies = [ "indicatif", "inquire", "json5", + "lazy_static", "log", "macros-rs", "merge-struct", @@ -1333,7 +1334,7 @@ dependencies = [ [[package]] name = "maid_server" -version = "1.1.0" +version = "1.2.0" dependencies = [ "anyhow", "bollard", diff --git a/Cargo.toml b/Cargo.toml index c39d540..9c6cd3c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ members = [ ] [workspace.package] -version = "1.1.0" +version = "1.2.0" edition = "2021" license = "MIT" repository = "https://github.com/exact-rs/maid" diff --git a/Maidfile.toml b/Maidfile.toml index c9df2cf..034cb7e 100644 --- a/Maidfile.toml +++ b/Maidfile.toml @@ -7,12 +7,12 @@ import = [ [project] name = "maid" -version = "1.1.0" +version = "1.2.0" # build on a remote server [project.server] address = { host = "localhost", port = 3500, ssl = false } -token = "test_token" +token = "test_token1" # global enviroment [env] @@ -20,7 +20,7 @@ BOOL = false STRING = 'test' TYPE = '%{dir.home} %{env.STRING} %{arg.1}' ARR = ['hello', 'world'] -VERSION='1.1.0' +VERSION='1.2.0' [tasks.build] info = "Build binaries" diff --git a/crates/maid/client/Cargo.toml b/crates/maid/client/Cargo.toml index d028191..7fa46f5 100644 --- a/crates/maid/client/Cargo.toml +++ b/crates/maid/client/Cargo.toml @@ -41,6 +41,7 @@ notify-debouncer-mini = "0.4.1" serde = { version = "1.0.192", features = ["derive"] } human_bytes = { version = "0.4.3", default-features = false } tungstenite = { version = "0.20.1", features = ["native-tls"] } +lazy_static = "1.4.0" [build-dependencies] -chrono = "0.4.23" \ No newline at end of file +chrono = "0.4.23" diff --git a/crates/maid/client/src/helpers/logger.rs b/crates/maid/client/src/helpers/logger.rs index bb072c7..d91148f 100644 --- a/crates/maid/client/src/helpers/logger.rs +++ b/crates/maid/client/src/helpers/logger.rs @@ -1,27 +1,32 @@ #[macro_export] macro_rules! log { - ($level:expr, $($arg:tt)*) => { - let level_colors: std::collections::HashMap<&str, (&str, &str)> = [ - ("fatal", ("FATAL", "bright red")), - ("error", ("ERROR", "red")), - ("warning", ("WARN", "yellow")), - ("success", ("SUCCESS", "green")), - ("notice", ("NOTICE", "bright blue")), - ("docker", ("DOCKER", "bright yellow")), - ("build", ("BUILD", "bright green")), - ("info", ("INFO", "cyan")), - ("debug", ("DEBUG", "magenta")), - ] - .iter() - .cloned() - .collect(); + ($level:expr, $($arg:tt)*) => {{ + lazy_static::lazy_static! { + static ref LEVEL_COLORS: std::collections::HashMap<Level, (&'static str, &'static str)> = { + let mut map = std::collections::HashMap::new(); + map.insert(Level::Fatal, ("FATAL", "bright red")); + map.insert(Level::Docker, ("DOCKER", "bright yellow")); + map.insert(Level::Info, ("INFO", "cyan")); + map.insert(Level::Build, ("BUILD", "bright green")); + map.insert(Level::Success, ("SUCCESS", "green")); + map.insert(Level::Debug, ("DEBUG", "magenta")); + map.insert(Level::Notice, ("NOTICE", "bright blue")); + map.insert(Level::Warning, ("WARN", "yellow")); + map.insert(Level::Error, ("ERROR", "red")); + return map; + }; + } - match level_colors.get($level) { - Some((level_text, color_func)) => { - let level_text = level_text.color(color_func.to_string()); - println!("{} {}", level_text, format_args!($($arg)*).to_string()) - } - None => println!("Unknown log level: {}", $level), + if $level == Level::None { + print!("{}", format_args!($($arg)*).to_string()); + } else { + match LEVEL_COLORS.get(&$level) { + Some((level_text, color_func)) => { + let level_text = level_text.color(color_func.to_string()); + println!("{} {}", level_text, format_args!($($arg)*).to_string()) + } + None => println!("Unknown log level: {:?}", $level), + }; } - }; + }}; } diff --git a/crates/maid/client/src/server/cli.rs b/crates/maid/client/src/server/cli.rs index 43eb731..ef776df 100644 --- a/crates/maid/client/src/server/cli.rs +++ b/crates/maid/client/src/server/cli.rs @@ -1,9 +1,9 @@ use crate::helpers; use crate::server; -use crate::structs::{ConnectionData, ConnectionInfo, Maidfile, Task, Websocket}; +use crate::structs::{ConnectionData, ConnectionInfo, Kind, Level, Maidfile, Task, Websocket}; use colored::Colorize; -use macros_rs::{crashln, fmtstr, then}; +use macros_rs::{crashln, fmtstr}; use reqwest::blocking::Client; use tungstenite::protocol::frame::{coding::CloseCode::Normal, CloseFrame}; use tungstenite::{client::connect_with_config, client::IntoClientRequest, protocol::WebSocketConfig, Message}; @@ -20,13 +20,14 @@ fn health(client: Client, values: Maidfile) -> server::api::health::Route { } }; - let body = match response.json::<server::api::health::Route>() { - Ok(body) => body, - Err(err) => { - log::warn!("{err}"); - crashln!("Unable to connect to the maid server. Is the token correct?") - } - }; + let body = + match response.json::<server::api::health::Route>() { + Ok(body) => body, + Err(err) => { + log::warn!("{err}"); + crashln!("Unable to connect to the maid server. Is the token correct?") + } + }; return body; } @@ -84,12 +85,12 @@ pub fn remote(task: Task) { let body = health(client, task.maidfile.clone()); let (_, websocket, token, host, port) = server::parse::all(task.maidfile.clone()); - crate::log!("info", "connecting to {host}:{port}"); + crate::log!(Level::Info, "connecting to {host}:{port}"); if body.status.healthy.data == "yes" { - crate::log!("notice", "server reports healthy"); + crate::log!(Level::Notice, "server reports healthy"); } else { - crate::log!("warning", "failed to connect"); + crate::log!(Level::Warning, "failed to connect"); } let websocket_config = WebSocketConfig { @@ -126,21 +127,12 @@ pub fn remote(task: Task) { loop { match socket.read() { Ok(Message::Text(text)) => { - if let Ok(Websocket { time: _, data, level }) = serde_json::from_str::<Websocket>(&text) { - data.get("message").and_then(|m| m.as_str()).map(|msg| { - if level.as_str() == "none" { - print!("{msg}"); - } else { - crate::log!(level.as_str(), "{}", msg); - } - }); - - if data.get("binary").map_or(false, |d| d.as_bool().unwrap_or(false)) { - log::debug!("sending archive"); - socket.send(Message::Binary(std::fs::read(&file_name).unwrap())).unwrap(); + if let Ok(Websocket { message, kind, level, .. }) = serde_json::from_str::<Websocket>(&text) { + match kind { + Kind::Done => break, + Kind::Message => crate::log!(level, "{}", message.unwrap()), + Kind::Binary => socket.send(Message::Binary(std::fs::read(&file_name).unwrap())).unwrap(), } - - then!(data.get("done").map_or(false, |d| d.as_bool().unwrap_or(false)), break); } } Ok(Message::Binary(archive)) => { @@ -158,7 +150,7 @@ pub fn remote(task: Task) { server::file::remove_tar(&archive_name); } Err(err) => { - crate::log!("fatal", "{err}"); + crate::log!(Level::Fatal, "{err}"); break; } _ => (), diff --git a/crates/maid/client/src/structs.rs b/crates/maid/client/src/structs.rs index 2446afe..5b14cbb 100644 --- a/crates/maid/client/src/structs.rs +++ b/crates/maid/client/src/structs.rs @@ -1,5 +1,4 @@ use serde::{Deserialize, Serialize}; -use serde_json::Value as JsonValue; use std::collections::BTreeMap; use std::path::PathBuf; use toml::Value as TomlValue; @@ -110,11 +109,33 @@ pub struct DisplayTask { pub hidden: bool, } +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq, Hash)] +pub enum Level { + None, + Fatal, + Docker, + Debug, + Error, + Notice, + Info, + Build, + Warning, + Success, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub enum Kind { + Done, + Binary, + Message, +} + #[derive(Clone, Debug, Deserialize, Serialize)] pub struct Websocket { - pub level: String, + pub level: Level, + pub kind: Kind, pub time: i64, - pub data: JsonValue, + pub message: Option<String>, } #[derive(Clone, Debug, Deserialize, Serialize)] diff --git a/crates/maid/server/src/docker/run.rs b/crates/maid/server/src/docker/run.rs index 898587d..b024118 100644 --- a/crates/maid/server/src/docker/run.rs +++ b/crates/maid/server/src/docker/run.rs @@ -1,232 +1,211 @@ -// use crate::structs::ConnectionData; -// use crate::table; -// -// use bollard::container::{Config, DownloadFromContainerOptions, RemoveContainerOptions, UploadToContainerOptions}; -// use bollard::exec::{CreateExecOptions, StartExecResults}; -// use bollard::image::CreateImageOptions; -// use bollard::{errors::Error, Docker}; -// use bytes::Bytes; -// use flate2::{write::GzEncoder, Compression}; -// use futures_core::Stream; -// use futures_util::stream::{SplitSink, SplitStream, TryStreamExt}; -// use futures_util::{SinkExt, StreamExt}; -// use macros_rs::{fmtstr, str, string}; -// use std::default::Default; -// use std::io::Write; -// use std::path::PathBuf; -// use std::sync::Arc; -// use text_placeholder::Template; -// use tokio::sync::Mutex; -// -// pub async fn concat_byte_stream<S>(s: S) -> Result<Vec<u8>, Error> -// where -// S: Stream<Item = Result<Bytes, Error>>, -// { -// s.try_fold(Vec::new(), |mut acc, chunk| async move { -// acc.extend_from_slice(&chunk[..]); -// Ok(acc) -// }) -// .await -// } -// -// // add error handling to all the unwraps -// pub async fn exec(tx: SplitSink<WebSocket, Message>, mut rx: SplitStream<WebSocket>, docker: Docker) -> Result<(), Box<dyn std::error::Error + 'static>> { -// let mut parsed: Option<ConnectionData> = None; -// let tx_ref = Arc::new(Mutex::new(tx)); -// -// while parsed.is_none() { -// if let Some(result) = rx.next().await { -// let msg = result.unwrap(); -// match serde_json::from_str::<ConnectionData>(msg.to_str().unwrap()) { -// Ok(value) => { -// parsed = Some(value); -// } -// Err(err) => { -// eprintln!("Failed to deserialize JSON: {:?}", err); -// } -// } -// } -// } -// -// let parsed = parsed.unwrap(); -// let name = &parsed.info.name; -// -// println!("creating container for task [{name}]"); -// docker -// .create_image( -// Some(CreateImageOptions { -// from_image: str!(parsed.info.remote.image.clone()), -// ..Default::default() -// }), -// None, -// None, -// ) -// .for_each(|msg| { -// let tx_ref = Arc::clone(&tx_ref); -// -// async move { -// let msg = msg.as_ref().expect("Failed to get CreateImageInfo"); -// let formatted = format!("{} {}", msg.status.clone().unwrap_or_else(|| string!("Waiting")), msg.progress.clone().unwrap_or_else(|| string!(""))); -// -// let mut tx_lock = tx_ref.lock().await; -// tx_lock -// .send(Message::text( -// serde_json::to_string(&serde_json::json!({ -// "level": "docker", -// "time": chrono::Utc::now().timestamp_millis(), -// "data": { "message": formatted }, -// })) -// .unwrap(), -// )) -// .await; -// } -// }) -// .await; -// -// let config = -// Config { -// image: Some(parsed.info.remote.image), -// tty: Some(true), -// ..Default::default() -// }; -// -// let id = docker.create_container::<&str, String>(None, config).await?.id; -// println!("created container"); -// -// docker.start_container::<String>(&id, None).await?; -// println!("started container"); -// -// let tx_ref = Arc::clone(&tx_ref); -// let mut tx_lock = tx_ref.lock().await; -// -// tx_lock -// .send(Message::text( -// serde_json::to_string(&serde_json::json!({ -// "level": "success", -// "time": chrono::Utc::now().timestamp_millis(), -// "data": { "binary": true }, -// })) -// .unwrap(), -// )) -// .await -// .unwrap(); -// -// if let Some(result) = rx.next().await { -// println!("received message: binary"); -// -// let msg = result.unwrap(); -// fn bytes_to_body(bytes: &[u8]) -> Body { Body::from(bytes.to_vec()) } -// -// // note: this `Result` may be an `Err` variant, which should be handled -// // help: use `let _ = ...` to ignore the resulting value -// docker -// .upload_to_container(&id, Some(UploadToContainerOptions { path: "/opt", ..Default::default() }), bytes_to_body(&msg.as_bytes())) -// .await; -// println!("wrote tarfile to container"); -// } -// -// let dependencies = match &parsed.maidfile.tasks[&parsed.info.name].depends { -// Some(deps) => { -// let mut dep_script: Vec<String> = vec![]; -// for item in deps.iter() { -// dep_script.push( -// parsed.maidfile.tasks[item] -// .script -// .as_array() -// .map(|arr| arr.iter().map(|val| val.as_str().unwrap_or_default()).collect::<Vec<_>>().join("\n")) -// .unwrap_or_default(), -// ); -// } -// dep_script.join("\n") -// } -// None => { -// string!("") -// } -// }; -// -// // move common things such as structs and helpers to seperate crate -// let table = table::create(parsed.maidfile.clone(), &parsed.info.args, PathBuf::new().join("/opt")); -// let script = Template::new_with_placeholder(str!(parsed.info.script.join("\n")), "%{", "}").fill_with_hashmap(&table); -// let dependencies = Template::new_with_placeholder(str!(dependencies), "%{", "}").fill_with_hashmap(&table); -// -// let exec = docker -// .create_exec( -// &id, -// CreateExecOptions { -// attach_stdout: Some(true), -// attach_stderr: Some(true), -// cmd: Some(vec![ -// str!(parsed.info.remote.shell), -// "-c", -// fmtstr!("cd /opt && touch script.sh && echo '{dependencies}\n{script}' > script.sh && chmod +x script.sh && ./script.sh"), -// ]), -// ..Default::default() -// }, -// ) -// .await? -// .id; -// -// if let StartExecResults::Attached { mut output, .. } = docker.start_exec(&exec, None).await? { -// tx_lock -// .send(Message::text( -// serde_json::to_string(&serde_json::json!({ -// "level": "build", -// "time": chrono::Utc::now().timestamp_millis(), -// "data": { "message": "waiting for build to finish..." }, -// })) -// .unwrap(), -// )) -// .await -// .unwrap(); -// -// while let Some(Ok(msg)) = output.next().await { -// if !parsed.info.remote.silent { -// tx_lock -// .send(Message::text( -// serde_json::to_string(&serde_json::json!({ -// "level": "none", -// "time": chrono::Utc::now().timestamp_millis(), -// "data": { "message": msg.to_string() }, -// })) -// .unwrap(), -// )) -// .await -// .unwrap(); -// } -// } -// } -// -// let res = -// docker.download_from_container( -// &id, -// Some(DownloadFromContainerOptions { -// path: fmtstr!("/opt/{}", parsed.info.remote.pull.clone()), -// }), -// ); -// let bytes = concat_byte_stream(res).await?; -// let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); -// -// encoder.write_all(&bytes)?; -// let compressed_data = encoder.finish()?; -// -// tx_lock.send(Message::binary(compressed_data)).await.unwrap(); -// println!("sent message: binary, from [{}]", parsed.info.remote.pull); -// -// tx_lock -// .send(Message::text( -// serde_json::to_string(&serde_json::json!({ -// "level": "success", -// "time": chrono::Utc::now().timestamp_millis(), -// "data": { "done": true }, -// })) -// .unwrap(), -// )) -// .await -// .unwrap(); -// println!("sent message: [done]"); -// -// println!("deleted old container"); -// // delete container if socket closed -// docker.remove_container(&id, Some(RemoveContainerOptions { force: true, ..Default::default() })).await?; -// Ok(()) -// } +macro_rules! Handle { + ($id:ident, $socket:ident, $expr:expr $(, || $callback:expr)?) => { + $( $callback; )? + if let Err(err) = $expr { + log::error!("{err}"); + $socket.remove_container(&$id, Some(RemoveContainerOptions { force: true, ..Default::default() })).await?; + log::warn!("removed old container"); + } + }; +} + +use crate::{structs::ConnectionData, table, Kind, Level, Response}; +use bytes::Bytes; +use flate2::{write::GzEncoder, Compression}; +use futures_core::Stream; +use futures_util::{stream::TryStreamExt, SinkExt, StreamExt}; +use macros_rs::{fmtstr, str, string}; +use rocket_ws::{stream::DuplexStream, Message}; +use std::{default::Default, io::Write, path::PathBuf}; +use text_placeholder::Template; + +use bollard::{ + container::{Config, DownloadFromContainerOptions, RemoveContainerOptions, UploadToContainerOptions}, + errors::Error, + exec::{CreateExecOptions, StartExecResults}, + image::CreateImageOptions, + Docker, +}; + +pub async fn concat_byte_stream<S>(s: S) -> Result<Vec<u8>, Error> +where + S: Stream<Item = Result<Bytes, Error>>, +{ + s.try_fold(Vec::new(), |mut acc, chunk| async move { + acc.extend_from_slice(&chunk[..]); + Ok(acc) + }) + .await +} + +pub async fn exec(mut stream: DuplexStream, docker: &Result<Docker, anyhow::Error>) -> Result<(), anyhow::Error> { + let socket = &docker.as_ref().unwrap(); + let mut parsed: Option<ConnectionData> = None; + + while parsed.is_none() { + if let Some(result) = stream.next().await { + match serde_json::from_str::<ConnectionData>(&result.unwrap().to_string()) { + Ok(value) => { + parsed = Some(value); + } + Err(err) => log::error!("Failed to deserialize JSON: {:?}", err), + } + } + } + + let parsed = parsed.unwrap(); + let name = &parsed.info.name; + let image = parsed.info.remote.image.clone(); + + log::info!("creating container (task={name}, image={})", image); + + let image_config = CreateImageOptions { + from_image: str!(image.clone()), + ..Default::default() + }; + + let mut container = socket.create_image(Some(image_config), None, None); + log::info!("image created"); + + while let Some(message) = container.next().await { + let message = message.as_ref().expect("Failed to get CreateImageInfo"); + let formatted = format!( + "{} {}", + message.status.clone().unwrap_or_else(|| string!("Waiting")), + message.progress.clone().unwrap_or_else(|| string!("")) + ); + + let docker_message = + Response { + level: Level::Docker, + message: Some(formatted), + kind: Kind::Message, + }; + + stream.send(docker_message.into()).await?; + } + + let config = Config { + image: Some(image), + tty: Some(true), + ..Default::default() + }; + + let id = socket.create_container::<&str, String>(None, config).await?.id; + log::info!("created container"); + + Handle!(id, socket, socket.start_container::<String>(&id, None).await, || log::info!("started container")); + + let binary_message = Response { + level: Level::Success, + kind: Kind::Binary, + message: None, + }; + + stream.send(binary_message.into()).await?; + + if let Some(result) = stream.next().await { + log::info!("received message: binary"); + + let msg = result?; + let bytes_to_body = |bytes: &[u8]| -> rocket::http::hyper::Body { rocket::http::hyper::Body::from(bytes.to_vec()) }; + let upload_options = UploadToContainerOptions { path: "/opt", ..Default::default() }; + + Handle!(id, socket, socket.upload_to_container(&id, Some(upload_options), bytes_to_body(&msg.into_data())).await); + log::info!("wrote tarfile to container"); + } + + let dependencies = match &parsed.maidfile.tasks[&parsed.info.name].depends { + Some(deps) => { + let mut dep_script: Vec<String> = vec![]; + for item in deps.iter() { + dep_script.push( + parsed.maidfile.tasks[item] + .script + .as_array() + .map(|arr| arr.iter().map(|val| val.as_str().unwrap_or_default()).collect::<Vec<_>>().join("\n")) + .unwrap_or_default(), + ); + } + dep_script.join("\n") + } + None => { + string!("") + } + }; + + // move common things such as structs and helpers to seperate crate + let table = table::create(parsed.maidfile.clone(), &parsed.info.args, PathBuf::new().join("/opt")); + let script = Template::new_with_placeholder(str!(parsed.info.script.join("\n")), "%{", "}").fill_with_hashmap(&table); + let dependencies = Template::new_with_placeholder(str!(dependencies), "%{", "}").fill_with_hashmap(&table); + + let exec = socket + .create_exec( + &id, + CreateExecOptions { + attach_stdout: Some(true), + attach_stderr: Some(true), + cmd: Some(vec![ + str!(parsed.info.remote.shell), + "-c", + fmtstr!("cd /opt && touch script.sh && echo '{dependencies}\n{script}' > script.sh && chmod +x script.sh && ./script.sh"), + ]), + ..Default::default() + }, + ) + .await? + .id; + + if let StartExecResults::Attached { mut output, .. } = socket.start_exec(&exec, None).await? { + let build_start_message = Response { + level: Level::Build, + kind: Kind::Message, + message: Some("waiting for build to finish..".to_string()), + }; + + Handle!(id, socket, stream.send(build_start_message.into()).await); + + while let Some(Ok(msg)) = output.next().await { + if !parsed.info.remote.silent { + let output_message = Response { + level: Level::None, + kind: Kind::Message, + message: Some(msg.to_string()), + }; + + Handle!(id, socket, stream.send(output_message.into()).await); + } + } + } + + let res = + socket.download_from_container( + &id, + Some(DownloadFromContainerOptions { + path: fmtstr!("/opt/{}", parsed.info.remote.pull.clone()), + }), + ); + + let bytes = concat_byte_stream(res).await?; + let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); + + encoder.write_all(&bytes)?; + let compressed_data = encoder.finish()?; + + Handle!(id, socket, stream.send(Message::binary(compressed_data)).await); + log::info!("sent message: binary, from [{}]", parsed.info.remote.pull); + + let done_message = Response { + level: Level::Success, + kind: Kind::Done, + message: None, + }; + + stream.send(done_message.into()).await?; + log::info!("sent message: [done]"); + + socket.remove_container(&id, Some(RemoveContainerOptions { force: true, ..Default::default() })).await?; + log::info!("removed old container"); + + Ok(()) +} diff --git a/crates/maid/server/src/main.rs b/crates/maid/server/src/main.rs index b06122f..fcec46b 100644 --- a/crates/maid/server/src/main.rs +++ b/crates/maid/server/src/main.rs @@ -6,9 +6,11 @@ mod table; use bollard::{Docker, API_DEFAULT_VERSION}; use docker::container; -use macros_rs::ternary; -use rocket::{get, launch, routes, State}; -use rocket_ws::{Config, Stream, WebSocket}; +use macros_rs::{fmtstr, ternary}; +use rocket::futures::SinkExt; +use rocket::{get, http::Status, launch, outcome::Outcome, routes, State}; +use rocket_ws::{Channel, Message, WebSocket}; +use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use std::env; @@ -16,8 +18,74 @@ struct DockerState { docker: Result<Docker, anyhow::Error>, } +#[derive(Serialize, Deserialize)] +enum Level { + None, + Fatal, + Docker, + Debug, + Error, + Notice, + Info, + Build, + Warning, + Success, +} + +#[derive(Serialize, Deserialize)] +enum Kind { + Done, + Binary, + Message, +} + +struct Response { + level: Level, + kind: Kind, + message: Option<String>, +} + +impl Response { + fn to_string(&self) -> String { + let json_value = serde_json::json!({ + "kind": &self.kind, + "level": &self.level, + "message": &self.message, + "time": chrono::Utc::now().timestamp_millis(), + }); + + serde_json::to_string(&json_value).unwrap() + } +} + +impl From<Response> for Message { + fn from(response: Response) -> Self { Message::text(response.to_string()) } +} + +#[derive(Debug)] +struct Token(String); + +#[rocket::async_trait] +impl<'r> rocket::request::FromRequest<'r> for Token { + type Error = (); + + async fn from_request(request: &'r rocket::Request<'_>) -> rocket::request::Outcome<Self, Self::Error> { + let token = "test_token1".to_string(); + let authorization_header = request.headers().get_one("Authorization"); + + if let Some(header_value) = authorization_header { + if header_value == fmtstr!("Bearer {token}") { + let token = header_value.trim_start_matches("Bearer ").to_owned(); + return Outcome::Success(Token(token)); + } + } + + Outcome::Error((Status::Unauthorized, ())) + } +} + #[get("/api/health")] -async fn health(docker_state: &State<DockerState>) -> Value { +async fn health(docker_state: &State<DockerState>, _token: Token) -> Value { let socket = &docker_state.docker.as_ref().unwrap(); let info = socket.version().await.unwrap(); let containers = container::list(socket).await.unwrap(); @@ -55,29 +123,38 @@ async fn health(docker_state: &State<DockerState>) -> Value { }) } -#[get("/echo")] -fn stream(ws: WebSocket) -> Stream!['static] { - let ws = ws.config(Config { - max_send_queue: Some(5), - ..Default::default() - }); +#[get("/ws/gateway")] +fn stream(ws: WebSocket, docker_state: &State<DockerState>, _token: Token) -> Channel { + let connect_success = Response { + level: Level::Success, + kind: Kind::Message, + message: Some("client connected".to_string()), + }; - Stream! { ws => - for await message in ws { - yield message?; - } - } + ws.channel(move |mut stream| { + Box::pin(async move { + stream.send(connect_success.into()).await?; + + match docker::run::exec(stream, &docker_state.docker).await { + Ok(_) => log::info!("build finished"), + Err() => log::error!("failed to build"), + }; + + Ok(()) + }) + }) } #[launch] #[tokio::main] async fn rocket() -> _ { - globals::init(); - let http = true; - let token = "test_token".to_string(); std::env::set_var("ROCKET_PORT", "3500"); + std::env::set_var("RUST_LOG", "info"); + + globals::init(); + env_logger::init(); let socket = async move { let socket = match http { @@ -93,30 +170,3 @@ async fn rocket() -> _ { rocket::build().manage(DockerState { docker: docker_socket }).mount("/", routes![health, stream]) } - -// #[tokio::main] -// async fn main() -> { -// // let connection = Docker::connect_with_http( -// // "http://my-custom-docker-server:2735", 4, API_DEFAULT_VERSION) -// // .unwrap(); -// -// // let session = SessionBuilder::default() -// // .user("root".to_string()) -// // .port(22) -// // .known_hosts_check(KnownHosts::Accept) -// // .control_directory(std::env::temp_dir()) -// // .connect_timeout(Duration::from_secs(5)) -// // .connect("100.79.107.11") -// // .await; -// -// let auth = warp::header::exact("Authorization", fmtstr!("Bearer {}", token)); -// let health = warp::path!("api" / "health").and_then(health_handler); -// -// -// -// -// -// let routes = health.or(gateway).and(auth); -// -// Ok(warp::serve(routes).run(([0, 0, 0, 0], port)).await) -// } -- GitLab