commit def10a79126600f0ace02554abe685c6307406d6 Author: william Date: Sun Mar 19 22:42:08 2023 -0400 Init diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/target diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/kvm.iml b/.idea/kvm.iml new file mode 100644 index 0000000..ff5191c --- /dev/null +++ b/.idea/kvm.iml @@ -0,0 +1,15 @@ + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..f321afb --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..5ba3082 --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,24 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "client" +version = "0.1.0" + +[[package]] +name = "libc" +version = "0.2.140" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99227334921fae1a979cf0bfdfcc6b3e5ce376ef57e16fb6fb3ea2ed6095f80c" + +[[package]] +name = "messages" +version = "0.1.0" + +[[package]] +name = "server" +version = "0.1.0" +dependencies = [ + "libc", +] diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..e1f5593 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,2 @@ +[workspace] +members = ["client", "server", "messages"] diff --git a/client/Cargo.toml b/client/Cargo.toml new file mode 100644 index 0000000..729587b --- /dev/null +++ b/client/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "client" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] diff --git a/client/src/main.rs b/client/src/main.rs new file mode 100644 index 0000000..69010c0 --- /dev/null +++ b/client/src/main.rs @@ -0,0 +1,25 @@ +use std::io; +use std::net::{SocketAddr, UdpSocket}; +use std::thread::sleep; +use std::time::Duration; + +fn main() -> io::Result<()> { + let bind_addr = SocketAddr::from(([127, 0, 0, 1], 4432)); + let server_addr = SocketAddr::from(([127, 0, 0, 1], 4433)); + + let socket = UdpSocket::bind(bind_addr)?; + socket.connect(server_addr)?; + + let mut buffer: [u8; 128] = [0; 128]; + for i in 1..10 { + for j in 1..128 { + buffer[j] = j as u8; + } + + socket.send(&buffer)?; + println!("{}", i); + sleep(Duration::from_millis(1000)); + } + + Ok(()) +} diff --git a/messages/Cargo.toml b/messages/Cargo.toml new file mode 100644 index 0000000..daa857d --- /dev/null +++ b/messages/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "messages" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] diff --git a/messages/src/client_registration.rs b/messages/src/client_registration.rs new file mode 100644 index 0000000..f602842 --- /dev/null +++ b/messages/src/client_registration.rs @@ -0,0 +1,19 @@ +use crate::Serializable; + +struct ClientRegistration { + name: str +} + +impl Serializable for ClientRegistration { + fn serialize(&self) -> Vec { + let mut buf: Vec = Vec::new(); + + buf.extend_from_slice(self.name.as_bytes()); + + buf + } + + fn deserialize(buf: &[u8]) -> Self { + String::from_utf8() + } +} diff --git a/messages/src/lib.rs b/messages/src/lib.rs new file mode 100644 index 0000000..34e5a80 --- /dev/null +++ b/messages/src/lib.rs @@ -0,0 +1,6 @@ +mod client_registration; + +trait Serializable { + fn serialize(&self) -> Vec; + fn deserialize(buf: Vec) -> Self; +} diff --git a/server/Cargo.toml b/server/Cargo.toml new file mode 100644 index 0000000..ccf6525 --- /dev/null +++ b/server/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "server" +version = "0.1.0" +edition = "2021" + +[dependencies] +libc = "0.2.140" diff --git a/server/src/epoll.rs b/server/src/epoll.rs new file mode 100644 index 0000000..10a29f3 --- /dev/null +++ b/server/src/epoll.rs @@ -0,0 +1,105 @@ +use std::io; +use std::os::fd::RawFd; + +use libc::epoll_event; + +pub struct Epoll { + fd: RawFd, + pub events: Vec, +} + +const READ_FLAGS: i32 = libc::EPOLLONESHOT | libc::EPOLLIN; +const WRITE_FLAGS: i32 = libc::EPOLLONESHOT | libc::EPOLLOUT; + +const EVENTS_CAPACITY: usize = 1024; +const WAIT_MAX_EVENTS: i32 = 1024; +const WAIT_TIMEOUT: i32 = 1000; + +impl Epoll { + pub fn create() -> io::Result { + match epoll_create() { + Ok(fd) => Ok(Epoll { + fd, + events: Vec::with_capacity(EVENTS_CAPACITY), + }), + Err(e) => Err(e), + } + } + + pub fn add_read_interest(&self, fd: RawFd, key: u64) -> io::Result<()> { + add_interest(self.fd, fd, listener_read_event(key)) + } + + pub fn modify_read_interest(&self, fd: RawFd, key: u64) -> io::Result<()> { + modify_interest(self.fd, fd, listener_read_event(key)) + } + + pub fn wait(&mut self) -> io::Result<()> { + self.events.clear(); + match epoll_wait(self.fd, &mut self.events, WAIT_MAX_EVENTS, WAIT_TIMEOUT) { + Ok(res) => { + // safe as long as the kernel does nothing wrong - copied from mio + unsafe { self.events.set_len(res) } + Ok(()) + } + Err(e) => Err(e) + } + } +} + +pub fn is_read_event(event: u32) -> bool { + event as i32 & libc::EPOLLIN == libc::EPOLLIN +} + +pub fn is_write_event(event: u32) -> bool { + event as i32 & libc::EPOLLOUT == libc::EPOLLOUT +} + +macro_rules! syscall { + ($fn: ident ( $($arg: expr),* $(,)* ) ) => {{ + let res = unsafe { libc::$fn($($arg, )*) }; + if res == -1 { + Err(std::io::Error::last_os_error()) + } else { + Ok(res) + } + }}; +} + +fn epoll_create() -> io::Result { + let fd = syscall!(epoll_create1(0))?; + if let Ok(flags) = syscall!(fcntl(fd, libc::F_GETFD)) { + let _ = syscall!(fcntl(fd, libc::F_SETFD, flags | libc::FD_CLOEXEC)); + } + + Ok(fd) +} + +fn epoll_wait(epoll_fd: RawFd, events: &mut Vec, max_events: i32, timeout: i32) -> io::Result { + match syscall!(epoll_wait( + epoll_fd, + events.as_mut_ptr() as *mut libc::epoll_event, + max_events, + timeout as libc::c_int + )) { + Ok(v) => Ok(v as usize), + Err(e) => Err(e) + } +} + +fn add_interest(epoll_fd: RawFd, fd: RawFd, mut event: epoll_event) -> io::Result<()> { + syscall!(epoll_ctl(epoll_fd, libc::EPOLL_CTL_ADD, fd, &mut event))?; + Ok(()) +} + +fn modify_interest(epoll_fd: RawFd, fd: RawFd, mut event: epoll_event) -> io::Result<()> { + syscall!(epoll_ctl(epoll_fd, libc::EPOLL_CTL_MOD, fd, &mut event))?; + Ok(()) +} + +fn listener_read_event(key: u64) -> epoll_event { + epoll_event { + events: READ_FLAGS as u32, + u64: key, + } +} diff --git a/server/src/main.rs b/server/src/main.rs new file mode 100644 index 0000000..51f254e --- /dev/null +++ b/server/src/main.rs @@ -0,0 +1,33 @@ +use std::io; +use std::net::{SocketAddr, TcpStream}; + +use crate::tcp_server::TcpServer; + +mod epoll; +mod tcp_server; + +#[derive(Debug)] +pub struct RequestContext { + pub stream: TcpStream, + pub content_length: usize, + pub buf: Vec, +} + +impl RequestContext { + fn new(stream: TcpStream) -> Self { + Self { + stream, + buf: Vec::new(), + content_length: 0, + } + } +} + +fn main() -> io::Result<()> { + let addr = SocketAddr::from(([127, 0, 0, 1], 4433)); + let mut server = TcpServer::new(addr)?; + + server.listen()?; + + Ok(()) +} diff --git a/server/src/tcp_server.rs b/server/src/tcp_server.rs new file mode 100644 index 0000000..ae5aae6 --- /dev/null +++ b/server/src/tcp_server.rs @@ -0,0 +1,92 @@ +use std::collections::HashMap; +use std::io; +use std::net::{SocketAddr, TcpListener}; +use std::os::fd::{AsRawFd, RawFd}; + +use crate::epoll::{Epoll, is_read_event, is_write_event}; +use crate::RequestContext; + +const KEY_NEW_CONNECTION: u64 = 100; + +pub struct TcpServer { + addr: SocketAddr, + listener: TcpListener, + listener_fd: RawFd, + epoll: Epoll, + request_contexts: HashMap, + key: u64, +} + +impl TcpServer { + pub fn new(addr: SocketAddr) -> io::Result { + let listener = TcpListener::bind(addr)?; + listener.set_nonblocking(true)?; + + let listener_fd = listener.as_raw_fd(); + + let epoll = Epoll::create()?; + epoll.add_read_interest(listener_fd, KEY_NEW_CONNECTION)?; + + Ok(TcpServer { + addr, + listener, + listener_fd, + epoll, + request_contexts: HashMap::new(), + key: KEY_NEW_CONNECTION, + }) + } + + pub fn listen(&mut self) -> io::Result<()> { + println!("Listening on {}", self.addr); + + loop { + self.epoll.wait().expect("Failed to wait for epoll event"); + + let events = &self.epoll.events.iter() + .map(|event| (event.events, event.u64)) + .collect::>(); + + for (events, u64) in events { + match *u64 { + KEY_NEW_CONNECTION => self.accept_connection()?, + key => self.handle_event(*events, key) + } + } + } + } + + fn accept_connection(&mut self) -> io::Result<()> { + match self.listener.accept() { + Ok((stream, addr)) => { + stream.set_nonblocking(true)?; + println!("New client: {addr}"); + self.key += 1; + self.epoll.add_read_interest(stream.as_raw_fd(), self.key)?; + self.request_contexts.insert(self.key, RequestContext::new(stream)); + } + Err(e) => eprintln!("Couldn't accept: {e}") + }; + + self.epoll.modify_read_interest(self.listener_fd, KEY_NEW_CONNECTION) + } + + fn handle_event(&mut self, events: u32, key: u64) { + let mut to_delete = None; + if let Some(context) = self.request_contexts.get_mut(&key) { + match events { + v if is_read_event(v) => { + // context.read_cb(key, epoll_fd)?; + } + v if is_write_event(v) => { + // context.write_cb(key, epoll_fd)?; + to_delete = Some(key); + } + v => println!("Unexpected event: {v}"), + } + } + if let Some(key) = to_delete { + self.request_contexts.remove(&key); + } + } +}