Всем привет! Это руководство о том, как написать небольшой веб-сервер для многопользовательской мини-игры. Суть игры проста — вы обмениваетесь файлами с другими игроками в небольшом 2D-пространстве. Полный код приложения находится в гитхабвы можете протестировать его на Веб-сайт.
Contents
Начинать
Большинство объяснений описываются как комментарии к коду.
Создайте проект с новой командой груза
[dependencies]
# Рантайм для асинхронного кода
tokio = { version = "1.28.1", features = ["full"] }
# Асинхронная версия библиотеки tungstenite, реализующей протокол WebSocket,
# работающая в связке с библиотекой tokio.
tokio-tungstenite = "0.18.0"
# Вспомогательные методы для работы с массивами байтов.
bstr = "1.4.0"
# Вспомогательные метода для работы асинхронными объектами.
futures-util = "0.3"
# Библиотека для работы регулярными выражениями.
regex = "1"
# Mutex, RwLock и тд. быстрее чем в стандартной библиотеке.
parking_lot = "0.12.1"
# Генерирует рандомные числа.
rand = "0.8.5"
Основные типы
Состав UserFileMessage
– хранит информацию о файле, переданную пользователем, а также имеет метод, позволяющий разобрать эту информацию из байтового массива. Будет использоваться при получении бинарного сообщения (один из типов сообщений протокола WebSocket) от пользователя для проверки его достоверности.
Состав Connection
– хранит логин ID, а также сам логин, в который записывается информация и передается вошедшему в систему пользователю.
перечисление BroadcastEvents
– содержит список событий, которые могут быть отправлены сразу всеми пользователями. Необходимо разделить по типам сообщения, получаемые вещателем (подробнее ниже).
Код типа (Fileserver/src/types.rs)
// Файл server/src/types.rs
// Трейт с вспомогательными методами для массима байтов
use bstr::ByteSlice;
// SplitSink это Sink часть разделённого на два объекта потока.
// Подробнее позже
use futures_util::stream::SplitSink;
// Асинхронный TCP стрим
use tokio::net::TcpStream;
// Тип WebSocket потока и сообщения из него
use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
pub struct UserFileMessage {
// Имя пользователя отправившего файл
pub username: String,
// Имя файла
pub file_name: String,
// Content-Type файла
pub file_type: String,
// Сам файл
pub file_bytes: Vec<u8>,
}
impl UserFileMessage {
// Метод из массива байтов собирает структуру UserFileMessage.
pub fn from(data: Vec<u8>) -> Option<Self> {
// Паттерн разделителя, который отделяет каждый фрагмент
// информации от другого. Например:
// username<pattern>filename<pattern>filetype<pattern>filebytes
let pattern: [u8; 12] = [226, 128, 147, 226, 128, 147, 226, 128, 147, 226, 128, 147];
// Делит массив по паттерну
let result: Vec<Vec<u8>> = data.split_str(&pattern).map(|x|x.to_vec()).filter(|x| x.len() > 0).collect();
// Если разделение прошло на 4 элемента, то все прошло успешно.
// Если нет, то данные переданы некорректно и нужно возвращать None.
if result.len() == 4 {
let username = String::from_utf8_lossy(&result[0]).to_string();
let file_name = String::from_utf8_lossy(&result[1]).to_string();
let file_type = String::from_utf8_lossy(&result[2]).to_string();
let file_bytes = result[3].clone();
Some(UserFileMessage { username, file_name, file_type, file_bytes })
} else {
None
}
}
}
#[derive(Debug)]
pub struct Connection {
// Рандомный идентификатор соединения
pub id: u32,
// Sink часть websocket потока для передачи в него информации
pub con: SplitSink<WebSocketStream<TcpStream>, Message>,
}
#[derive(Debug)]
pub enum BroadcastEvents {
// Добавляет подключение в broadcast'ер
AddConn(Connection),
// Отправка всем участникам сообщения о подключении нового
Join(String),
// Отправка всем участникам сообщения о выходе игрока.
// Так же передаём первым элементом идентификатор его
// подключения, чтобы удалить его из списка.
Quit(u32, String),
// Отправка всем участникам полученного от одного из них файла.
SendFile(Message)
}
Транслировать
Вещание – это функция, которая будет работать в отдельном токио Task’е, получать сигналы от других потоков и в зависимости от полученного сигнала отправлять разные сообщения всем подключенным игрокам.
Задача в токио – зеленый (виртуальный) тред. Он управляется средой выполнения tokyo, а не ОС.
Широковещательный код (Fileserver/src/broadcast.rs)
// Файл server/src/broadcast.rs
use std::collections::HashMap;
// Трейт добавляющий удобные методы для Sink объектов (в
// нашем случае Sink'ом будет, полученная после деления, вторая часть
// потока WebSocketStream, в которую мы будем записывать данные, для
// передачи их пользователю).
use futures_util::SinkExt;
// Тип получателя для неограниченного mpsc канала
use tokio::sync::mpsc::UnboundedReceiver;
// Тип сообщения, передаваемого по WebSocket'у
use tokio_tungstenite::tungstenite::Message;
use crate::types::{BroadcastEvents, Connection};
pub async fn run(mut rx: UnboundedReceiver<BroadcastEvents>) {
let mut connections: HashMap<u32, Connection> = HashMap::new();
// В цикле ждём новый сигнал
while let Some(event) = rx.recv().await {
match event {
// Добавляем соединение в список для рассылки
BroadcastEvents::AddConn(conn) => {
connections.insert(conn.id, conn);
}
// Рассылаем всем юзерам сообщение о подключении нового игрока
// Пример сообщения: JOIN|userA
BroadcastEvents::Join(username) => {
for (_, iconn) in connections.iter_mut() {
let _ = iconn.con.send(Message::Text(format!("JOIN|{}", username))).await;
}
}
// Рассылаем всем юзерам сообщение о выходе игрока
// Пример сообщения: LEFT|userA
BroadcastEvents::Quit(id, username) => {
connections.remove(&id);
if !username.is_empty() {
for (_, conn) in connections.iter_mut() {
let _ = conn.con.send(Message::Text(format!("LEFT|{}", username))).await;
}
}
}
// Пересылаем юзерам сообщение с файлом.
// Так как сообщение для пересылки нужно клонировать и при
// большом количестве игроков это может вызвать замедления в работе
// функции, максимальный размер файла ограничен 5МБ. (Это будет ниже)
BroadcastEvents::SendFile(msg) => {
for (_, conn) in connections.iter_mut() {
let _ = conn.con.send(msg.clone()).await;
}
}
}
}
}
Состояния игрока
Для управления состоянием игроков создадим отдельную структуру Game
в котором мы будем хранить список подключенных пользователей и отправителя сигналов для трансляции.
Чтобы предотвратить файловый спам, у нас будет небольшой 15-секундный кулдаун для игроков, чтобы отправить файл. Таким образом, каждому игроку не будет присвоено ничего, кроме его имени и времени последней отправки файла.
Код структуры игры (Fileserver/src/game.rs)
// Файл server/src/game.rs
use std::{collections::HashMap, sync::Arc};
use parking_lot::Mutex;
use tokio::{sync::mpsc::UnboundedSender, time::Instant};
use tokio_tungstenite::tungstenite::Message;
use crate::types::{BroadcastEvents, Connection};
pub struct Player {
// Время последнего отправленного файла.
// Так как, когда пользователь только зашел, он ещё не успел отправить
// что-нибудь, записывать в это поле нечего. В таком случае будет
// выставлено значение енама Option – None.
pub dt_last_send: Option<Instant>,
}
#[derive(Clone)]
pub struct Game {
// Мапа со всеми подключенными играками (Формат: username:Player).
// Для того чтобы это поле было доступно для нескольких потоков,
// его необходимо обернуть в смарт-поинтер Arc, которые реализует
// множественное владение данными в нескольких потоках. А чтобы
// мапу можно было внутри Arc'а ещё и изменять, нужно дополнительно
// завернуть её в Mutex. Так же Mutex обезопасит данные, ограничив
// максимальное количество единовременно изменяемых мапу потоков до одного.
pub players: Arc<Mutex<HashMap<String, Player>>>,
// Отправитель сигналов в Broadcast
pub broadcast_sender: UnboundedSender<BroadcastEvents>
}
impl Game {
// Создаём новый инстанс
pub fn new(broadcast_sender: UnboundedSender<BroadcastEvents>) -> Game {
Game {
players: Arc::new(Mutex::new(HashMap::new())),
broadcast_sender,
}
}
// Добавляем игрока и отправляем сигнал в Broadcast.
// Вызов метода send вернёт Result, который мы не хотим в данном
// случае обрабатывать. Поэтому засунем его в игнорируемую переменную.
// Так же проверяем есть ли такой игрок уже на сервере и
// не превысит ли количество игроков на сервере 50,
// если игрок подключится. (Максимум 50 игроков)
pub fn add_player(&self, username: String) {
let mut players = self.players.lock();
if !players.contains_key(&username) && players.len() < 50 {
players.insert(username.clone(), Player { dt_last_send: None });
let _ = self.broadcast_sender.send(BroadcastEvents::Join(username.clone()));
}
}
// Добавляем соединение в Broadcast
pub fn add_connection(&self, conn: Connection) {
let _ = self.broadcast_sender.send(BroadcastEvents::AddConn(conn));
}
// Удаляем игрока и отправляем сигнал в Broadcast
pub fn remove_player(&self, username: String, id: u32) {
self.players.lock().remove(&username);
let _ = self.broadcast_sender.send(BroadcastEvents::Quit(id, username));
}
// Формируем сообщение со списком пользователей.
// Пример: LIST|userA,userB,userC
pub fn get_list_message(&self) -> Message {
let list_string = self.players.lock().iter().map(|w| w.0.to_owned()).collect::<Vec<String>>().join(",");
Message::Text(format!("LIST|{}", list_string))
}
// Отправляем сообщение с файлом в Broadcast.
// Перед этим проверяем прошел или нет у пользователя
// cooldown на отправку файла.
pub fn send_file(&self, from: String, msg: Message) {
let mut players = self.players.lock();
if let Some(player) = players.get(&from) {
if player.dt_last_send.is_none() || player.dt_last_send.unwrap().elapsed().as_secs() > 15 {
let _ = self.broadcast_sender.send(BroadcastEvents::SendFile(msg));
players.insert(from, Player { dt_last_send: Some(Instant::now()) });
}
}
}
}
Получение запросов на подключение
Поскольку WebSocket работает поверх TCP, сначала нам просто нужно создать прослушивающий TCP-сокет, используя метод bind
к TcpListener’у, и в цикле принимать все запросы на подключение.
Для оптимизации обработки соединений предпочтительнее выводить их в отдельный поток. В нашем случае в Task’у Tokyo.
Код дома main.rs
// Файл server/src/main.rs
// Методы для удобного управления потоком подключения WebSocket
use futures_util::{StreamExt, SinkExt};
use tokio::net::{TcpStream, TcpListener};
// Фунция для принятия WebSocket соединения
use tokio_tungstenite::accept_async;
use tokio::sync::mpsc;
// Структура для работы с регулярными выражениями
use regex::Regex;
// Рандомайзер для чисел
use rand::Rng;
use types::{Connection, BroadcastEvents, UserFileMessage};
use game::Game;
mod types;
mod game;
mod broadcast;
// Обязательно навешиваем на main макрос tokio::main, который
// создаст рантайм токио для исполнения асинхронного кода.
#[tokio::main]
async fn main () {
// Создаём прослушивающий TCP сокет на порте 8080
let server = TcpListener::bind("0.0.0.0:8080").await.unwrap();
// Создаём неограниченный канал mpsc
let (broadcast_sender, broadcast_receiver) = mpsc::unbounded_channel::<BroadcastEvents>();
// Создаём новый поток токио и на выполнение ему передаём нашу
// функцию Broadcast.
tokio::spawn(broadcast::run(broadcast_receiver));
let game = Game::new(broadcast_sender);
// В цикле принимаем все запросы на соединение
loop {
let (stream, _) = server.accept().await.unwrap();
// Обрабатываем все соединения в отдельном токио потоке
tokio::spawn(process_con(stream, game.clone()));
}
}
// Функция обработки соединения
async fn process_con(stream: TcpStream, game: Game) {
println!("Connection!")
}
Управление подключением
Наша функция управления подключением будет работать в 2 этапа.
На первом этапе он отправит пользователю список всех подключенных игроков, добавит подключение в список подключений в вещателе, чтобы подключенный пользователь уже мог получать данные о выходах и входах других игроков. И после этого будет ждать сообщение в формате “JOIN|
На втором этапе он будет зацикливаться на сообщениях, содержащих файлы от пользователя, и отправлять их всем другим подключенным читателям. Когда цикл снова пытается прочитать сообщение и получает ошибку, это означает, что соединение с пользователем разорвано. После этого функция вызовет метод remove_player
в случае game
который удалит подключение пользователя из трансляции и его никнейм из списка игроков, а также уведомит всех подключенных пользователей об их отключении.
Код диспетчера входа в систему (файловый сервер/mail.rs)
// Файл server/src/main.rs
// ...
async fn process_con(stream: TcpStream, game: Game) {
// Генерируем идентификатор для соединения
let id = rand::thread_rng().gen::<u32>();
// Производим websocket handshake, после чего получаем
// websocket поток или ошибку. В случае ошибки виртуальный
// поток токио оборвется.
let websocket = accept_async(stream).await.unwrap();
// Делим поток вебсокета на Sink и Stream объекты
// для того чтобы можно было читать данные с потока в
// одном потоке, а получать в другом. В нашем случае
// Sink объект (sender) мы отправим броадкастеру, а с
// Stream продолжим работать тут чтоб получать сообщения.
let (mut sender, mut receiver) = websocket.split();
let mut username = String::new();
// Отправляем список игроков пользователю
let _ = sender.send(game.get_list_message()).await;
// Добавляем подключение в список подключений Broadcast
game.add_connection(Connection {
id,
con: sender,
});
// Ждём сообщение входа от пользователя.
// Если приходят любые другие сообщения, кроме текстовых
// формата "JOIN|<username>" – игнорируем их.
// При это длина имени игрока не должна превышать 13 символов.
// Когда валидное сообщение получено, он добавляет игрока в список и
// сообщает всем остальнм о новом участнике.
while let Some(msg) = receiver.next().await {
if let Ok(msg) = msg {
if !msg.is_text() {
continue;
}
if let Ok(data) = msg.clone().into_text() {
let re = Regex::new(r"JOIN\|[A-Za-z0-9]*$").unwrap();
if re.is_match(&data) {
username = (&data[5..]).to_owned();
if username.len() > 13 {
continue;
}
game.add_player(username.clone());
break;
}
}
} else {
game.remove_player(username, id);
return;
}
}
// Обработываем полученные сообщения после входа.
// Все не бинарные сообщение игнорируем, а бинарные парсим и
// проверяем на валидность. Если размер файла больше 5МБ, или
// если в сообщении отправителем указан не тот игрок, что
// записан в соединении – игнорируем. В ином случае, пересылаем
// сообщение всем подключенным игрокам.
while let Some(msg) = receiver.next().await {
if let Ok(msg) = msg {
if !msg.is_binary() {
continue;
}
if let Some(data) = UserFileMessage::from(msg.clone().into_data()) {
if data.file_bytes.len() > 5_000_000 && data.username == username {
continue;
}
game.send_file(data.username, msg);
}
} else {
break;
}
}
// Когда соединение оборвано (цикл закончился), удаляем игрока
// из списка и уведомляем всех остальных об этом.
game.remove_player(username, id);
}
Спасибо за внимание! 🙂 Также клиентскую часть игры можно найти в репозитории проекта по ссылке ниже.
Ссылка на полный код проекта – github.com/IDSaves/filecats