Skip to content

Commit

Permalink
async file handling
Browse files Browse the repository at this point in the history
  • Loading branch information
alembiq committed Jun 28, 2024
1 parent 5163914 commit 3d25e9e
Show file tree
Hide file tree
Showing 13 changed files with 136 additions and 25 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
[15](https://www.dropbox.com/scl/fi/qpdeda3kx646jnwaf8j1s/Lecture-15.MP4?rlkey=b4ng5msydqt5zpm3pz05lqmm7&st=edb0in9i&dl=0&authuser=0)
[16](https://www.dropbox.com/scl/fi/3hf3ystbijpvu57f48yua/Lecture-16.MP4?rlkey=qntbm0gmjrt5xr3hjom3drktl&st=t3t5gzuj&dl=0&authuser=0)
[17](https://www.dropbox.com/scl/fi/m182zj2hjvn95dckx3616/Lecture-17.MP4?rlkey=9k5f1qub18fkme81v6x1jzozk&st=pg3833ad&dl=0&authuser=0)
18 19 20
[18](https://www.dropbox.com/scl/fi/zglhfknjwa39ywefwjox7/Lecture-18.MP4?rlkey=fmb6j1fmzk1zr1imyxbrivipr&st=iutxkn02&dl=0&authuser=0)
19 20

- **DevStreams**:
[1](https://www.dropbox.com/scl/fi/t7wfwubrl0361bjwxd5vx/DevStream-1.MP4?rlkey=4i319jzky1eqa3omcbpziuqum&st=t93zhcji&dl=0&authuser=0)
Expand Down
2 changes: 1 addition & 1 deletion lesson13/server13/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ fn listen_and_accept(address: String) -> Result<()> {
"{} failed to send message: {message:?} -> {e}",
current_time()
);
peers_to_remove.push(peer_addr.clone());
peers_to_remove.push(*peer_addr);
}
}

Expand Down
2 changes: 2 additions & 0 deletions lesson15/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@ time = "0.3.36"
parking_lot = "0.12.3"
eyre = "0.6.12"
tokio = { version = "1.38.0", features = ["full"] }
bincode = "1.3.3"
anyhow = "1.0.86"
16 changes: 16 additions & 0 deletions lesson15/files/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "lesson15"
version = "0.1.0"
edition = "2021"

[dependencies]
ciborium = "0.2.2"
image = "0.25.1"
serde = { version = "1.0.202", features = ["derive"] }
thiserror = "1.0.61"
time = "0.3.36"
parking_lot = "0.12.3"
eyre = "0.6.12"
tokio = { version = "1.38.0", features = ["full"] }
bincode = "1.3.3"
anyhow = "1.0.86"
Binary file added lesson15/images/1719588157.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added lesson15/images/1719588650.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added lesson15/images/1719588730.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
17 changes: 6 additions & 11 deletions lesson15/src/bin/client.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use eyre::{anyhow, bail, Context, Result};

use lesson15::{
directory_create, file_read, filename_from_input, image_to_png, message_incomming,
message_outgoing, server_address, timestamp, MessageType, DIRECTORY_FILES, DIRECTORY_IMAGES,
async_file_read, async_file_write, directory_create, filename_from_input, image_to_png,
message_incoming, message_outgoing, server_address, timestamp, MessageType, DIRECTORY_FILES,
DIRECTORY_IMAGES,
};

use std::env;
Expand Down Expand Up @@ -63,7 +64,7 @@ fn send_to_stream(mut stream: TcpStream) -> Result<JoinHandle<Result<()>>> {
}
".file" => MessageType::File(
filename_from_input(trimmed_input)?.to_string(),
file_read(trimmed_input.to_string()),
async_file_read(trimmed_input),
),
".image" => {
MessageType::Image(image_to_png(filename_from_input(trimmed_input)?))
Expand All @@ -87,7 +88,7 @@ fn read_from_stream(mut stream: TcpStream) -> JoinHandle<()> {
directory_create(DIRECTORY_FILES);
directory_create(DIRECTORY_IMAGES);
thread::spawn(move || loop {
let message = match message_incomming(&mut stream) {
let message = match message_incoming(&mut stream) {
Ok(res) => res,
Err(e) => {
eprintln!("{} Stream inter: {e}", timestamp());
Expand All @@ -101,15 +102,9 @@ fn read_from_stream(mut stream: TcpStream) -> JoinHandle<()> {
println!("{} {text:?}", timestamp());
}
MessageType::File(name, content) => {
//TODO unable to save
//TODO file already exist
fs::write(format!("{}/{}", DIRECTORY_FILES, name), content)
.expect("Could not write file");
println!("{} Receiving {name}", timestamp());
async_file_write(format!("{}/{}", DIRECTORY_FILES, name), content);
}
MessageType::Image(image) => {
//TODO unable to save
//TODO file already exist
let image_file: String = std::time::UNIX_EPOCH
.elapsed()
.unwrap()
Expand Down
63 changes: 56 additions & 7 deletions lesson15/src/bin/server.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use eyre::{bail, Result};
use parking_lot::Mutex;
// use tokio::io::{AsyncReadExt, AsyncWriteExt};
// use tokio::net::{TcpListener, TcpStream, ToSocketAddrs};
// use tokio::sync::broadcast;

use lesson15::{message_incomming, message_outgoing, server_address, timestamp, MessageType};
use lesson15::{message_incoming, message_outgoing, server_address, timestamp, MessageType};

use std::collections::HashMap;
use std::env;
Expand All @@ -13,11 +16,57 @@ use std::thread;
async fn main() -> Result<()> {
let server_address: String = server_address(env::args().collect());
println!("{} Starting server on {}!", timestamp(), server_address);
listen_and_accept(server_address)?;
listen_and_accept(server_address).await?;
Ok(())
}

fn listen_and_accept(address: String) -> Result<()> {
async fn listen_and_accept(address: String) -> Result<()> {
// let listener = TcpListener::bind(&address)
// .await
// .context("Unable to listen")?;
// println!("{} Server running on {}", timestamp(), address);

// loop {
// // Accept incoming connections
// let (mut socket, addr) = match listener.accept().await {
// Ok((socket, addr)) => (socket, addr),
// Err(e) => {
// eprintln!("{} Failed to accept connection: {}", timestamp(), e);
// continue;
// }
// };
// println!("{} New connection from {}", timestamp(), addr);

// // Spawn a new task for each connection
// tokio::spawn(async move {
// let mut buffer = [0; 1024];
// // Read data from the socket
// loop {
// match socket.read(&mut buffer).await {
// Ok(0) => {
// // Connection was closed
// println!("{} Connection closed {}", timestamp(), addr);
// return;
// }
// Ok(n) => {
// //TODO save received data to db
// //TODO forward to all connected clients
// println!("{} {} data", timestamp(), addr);
// // Echo the data back to the client
// if let Err(e) = socket.write_all(&buffer[..n]).await {
// eprintln!("{} Failed to write to socket: {}", timestamp(), e);
// return;
// }
// }
// Err(e) => {
// eprintln!("{} Failed to read from socket: {}", timestamp(), e);
// return;
// }
// }
// }
// });
// }
// }
let listener = match TcpListener::bind(address) {
Ok(l) => l,
Err(e) => {
Expand All @@ -39,7 +88,7 @@ fn listen_and_accept(address: String) -> Result<()> {
let clients_clone = clients.clone();

thread::spawn(move || loop {
let message = match message_incomming(&mut stream) {
let message = match message_incoming(&mut stream) {
Ok(msg) => msg,
Err(e) => {
eprintln!("{} {addr} stream interrupted: {e}", timestamp());
Expand All @@ -57,7 +106,7 @@ fn listen_and_accept(address: String) -> Result<()> {

if let Err(e) = message_outgoing(peer_stream, &message) {
eprintln!("{} failed to send message: {message:?} -> {e}", timestamp());
peers_to_remove.push(peer_addr.clone());
peers_to_remove.push(*peer_addr);
}
}

Expand All @@ -76,12 +125,12 @@ fn listen_and_accept(address: String) -> Result<()> {
println!("{} {addr} sending: {}", timestamp(), name);
}
MessageType::Image(_image) => {
let image_file: String = std::time::UNIX_EPOCH
let file_name: String = std::time::UNIX_EPOCH
.elapsed()
.unwrap()
.as_secs()
.to_string();
println!("{} {addr} sending: {}.png", timestamp(), image_file);
println!("{} {addr} sending: {}.png", timestamp(), file_name);
}
}
});
Expand Down
23 changes: 18 additions & 5 deletions lesson15/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ use image::codecs::png::PngEncoder;
use image::ImageEncoder;
use serde::{Deserialize, Serialize};
use thiserror::Error;
// use tokio::io::{AsyncReadExt, AsyncWriteExt};
// use tokio::net::TcpStream;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

use std::error::Error;
use std::fs::{self};
Expand Down Expand Up @@ -67,7 +66,7 @@ pub enum MessageType {
Text(String),
}

pub fn message_incomming(stream: &mut TcpStream) -> Result<MessageType, ErrorMessage> {
pub fn message_incoming(stream: &mut TcpStream) -> Result<MessageType, ErrorMessage> {
let mut len_bytes = [0; 4];
stream.read_exact(&mut len_bytes)?;
let len = u32::from_be_bytes(len_bytes) as usize;
Expand All @@ -91,10 +90,24 @@ pub fn message_outgoing(

/// FILE HANDLING
pub fn file_read(input: String) -> Vec<u8> {
#[tokio::main]
pub async fn async_file_read(input: &str) -> Vec<u8> {
let mut filename = input.split_whitespace();
let filename: &str = filename.nth(1).expect("missing filename");
std::fs::read(format!("./{}", filename)).unwrap()
let mut file = tokio::fs::File::open(format!("./{}", filename))
.await
.unwrap();
let mut contents = Vec::new();
file.read_to_end(&mut contents).await.unwrap();
contents
}

#[tokio::main]
pub async fn async_file_write(filename: String, data: Vec<u8>) {
let mut file = tokio::fs::File::create(format!("./{}", filename))
.await
.unwrap();
file.write_all(data.as_ref()).await.unwrap();
}

pub fn directory_create(directory: &str) {
Expand Down
17 changes: 17 additions & 0 deletions lesson17/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
version: '3'
services:
prometheus:
image: prom/prometheus
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
ports:
- "9090:9090"

grafana:
image: grafana/grafana
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
ports:
- "3123:3000"
depends_on:
- prometheus
7 changes: 7 additions & 0 deletions lesson17/prometheus.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
global:
scrape_interval: 5s

scrape_configs:
- job_name: 'rust_application'
static_configs:
- targets: ['<url>:10002']

0 comments on commit 3d25e9e

Please sign in to comment.