Всем привет! Это руководство о том, как написать небольшой веб-сервер для многопользовательской мини-игры. Суть игры проста — вы обмениваетесь файлами с другими игроками в небольшом 2D-пространстве. Полный код приложения находится в гитхабвы можете протестировать его на Веб-сайт.

Демо

Начинать

Большинство объяснений описываются как комментарии к коду.

Создайте проект с новой командой груза . После этого добавьте все эти зависимости в файл Cargo.toml.

[dependencies]
# Рантайм для асинхронного кода
tokio = { version = "1.28.1", features = ["full"] }

# Асинхронная версия библиотеки tungstenite, реализующей протокол WebSocket, 
# работающая в связке с библиотекой tokio.
tokio-tungstenite = "0.18.0"

# Вспомогательные методы для работы с массивами байтов. 
bstr = "1.4.0"

# Вспомогательные метода для работы асинхронными объектами.
futures-util = "0.3"

# Библиотека для работы регулярными выражениями.
regex = "1"

# Mutex, RwLock и тд. быстрее чем в стандартной библиотеке.
parking_lot = "0.12.1"

# Генерирует рандомные числа.
rand = "0.8.5"

Основные типы

Состав UserFileMessage – хранит информацию о файле, переданную пользователем, а также имеет метод, позволяющий разобрать эту информацию из байтового массива. Будет использоваться при получении бинарного сообщения (один из типов сообщений протокола WebSocket) от пользователя для проверки его достоверности.

Состав Connection – хранит логин ID, а также сам логин, в который записывается информация и передается вошедшему в систему пользователю.

перечисление BroadcastEvents – содержит список событий, которые могут быть отправлены сразу всеми пользователями. Необходимо разделить по типам сообщения, получаемые вещателем (подробнее ниже).

Код типа (Fileserver/src/types.rs)
// Файл server/src/types.rs

// Трейт с вспомогательными методами для массима байтов 
use bstr::ByteSlice;

// SplitSink это Sink часть разделённого на два объекта потока. 
// Подробнее позже
use futures_util::stream::SplitSink;

// Асинхронный TCP стрим 
use tokio::net::TcpStream;

// Тип WebSocket потока и сообщения из него
use tokio_tungstenite::{tungstenite::Message, WebSocketStream};

pub struct UserFileMessage {
  // Имя пользователя отправившего файл
  pub username: String,

  // Имя файла
  pub file_name: String,

  // Content-Type файла
  pub file_type: String,

  // Сам файл 
  pub file_bytes: Vec<u8>,
}

impl UserFileMessage {
  // Метод из массива байтов собирает структуру UserFileMessage.
  pub fn from(data: Vec<u8>) -> Option<Self> {
    // Паттерн разделителя, который отделяет каждый фрагмент 
    // информации от другого. Например:
    // username<pattern>filename<pattern>filetype<pattern>filebytes
    let pattern: [u8; 12] = [226, 128, 147, 226, 128, 147, 226, 128, 147, 226, 128, 147]; 
    
    // Делит массив по паттерну
    let result: Vec<Vec<u8>> = data.split_str(&pattern).map(|x|x.to_vec()).filter(|x| x.len() > 0).collect();
    
    // Если разделение прошло на 4 элемента, то все прошло успешно. 
    // Если нет, то данные переданы некорректно и нужно возвращать None.
    if result.len() == 4 {
      let username = String::from_utf8_lossy(&result[0]).to_string();
      let file_name = String::from_utf8_lossy(&result[1]).to_string();
      let file_type = String::from_utf8_lossy(&result[2]).to_string();
      
      let file_bytes = result[3].clone();
      
      Some(UserFileMessage { username, file_name, file_type, file_bytes })
    } else {
      None
    }
  }
}

#[derive(Debug)]
pub struct Connection {
  // Рандомный идентификатор соединения
  pub id: u32,

  // Sink часть websocket потока для передачи в него информации
  pub con: SplitSink<WebSocketStream<TcpStream>, Message>,
}

#[derive(Debug)]
pub enum BroadcastEvents {
  // Добавляет подключение в broadcast'ер
  AddConn(Connection),
 
  // Отправка всем участникам сообщения о подключении нового
  Join(String),

  // Отправка всем участникам сообщения о выходе игрока. 
  // Так же передаём первым элементом идентификатор его
  // подключения, чтобы удалить его из списка.
  Quit(u32, String),

  // Отправка всем участникам полученного от одного из них файла.
  SendFile(Message)
}

Транслировать

Вещание – это функция, которая будет работать в отдельном токио Task’е, получать сигналы от других потоков и в зависимости от полученного сигнала отправлять разные сообщения всем подключенным игрокам.

Задача в токио – зеленый (виртуальный) тред. Он управляется средой выполнения tokyo, а не ОС.

Широковещательный код (Fileserver/src/broadcast.rs)
// Файл server/src/broadcast.rs

use std::collections::HashMap;

// Трейт добавляющий удобные методы для Sink объектов (в 
// нашем случае Sink'ом будет, полученная после деления, вторая часть
// потока WebSocketStream, в которую мы будем записывать данные, для 
// передачи их пользователю).
use futures_util::SinkExt;

// Тип получателя для неограниченного mpsc канала
use tokio::sync::mpsc::UnboundedReceiver;

// Тип сообщения, передаваемого по WebSocket'у
use tokio_tungstenite::tungstenite::Message;

use crate::types::{BroadcastEvents, Connection};

pub async fn run(mut rx: UnboundedReceiver<BroadcastEvents>) {
  let mut connections: HashMap<u32, Connection> = HashMap::new();

  // В цикле ждём новый сигнал 
  while let Some(event) = rx.recv().await {
    match event {
      // Добавляем соединение в список для рассылки
      BroadcastEvents::AddConn(conn) => {
        connections.insert(conn.id, conn);
      }
			
	  // Рассылаем всем юзерам сообщение о подключении нового игрока
      // Пример сообщения: JOIN|userA
      BroadcastEvents::Join(username) => {
        for (_, iconn) in connections.iter_mut() {
          let _ = iconn.con.send(Message::Text(format!("JOIN|{}", username))).await;
        }
      }

      // Рассылаем всем юзерам сообщение о выходе игрока
      // Пример сообщения: LEFT|userA
      BroadcastEvents::Quit(id, username) => {
        connections.remove(&id);

        if !username.is_empty() {
          for (_, conn) in connections.iter_mut() {
            let _ = conn.con.send(Message::Text(format!("LEFT|{}", username))).await;
          }
        }
      }

      // Пересылаем юзерам сообщение с файлом.
      // Так как сообщение для пересылки нужно клонировать и при 
	  // большом количестве игроков это может вызвать замедления в работе
      // функции, максимальный размер файла ограничен 5МБ. (Это будет ниже)
      BroadcastEvents::SendFile(msg) => {
        for (_, conn) in connections.iter_mut() {
          let _ = conn.con.send(msg.clone()).await;
        }
      }
    }
  }
}

Состояния игрока

Для управления состоянием игроков создадим отдельную структуру Gameв котором мы будем хранить список подключенных пользователей и отправителя сигналов для трансляции.

ЧИТАТЬ   Главком ВМС Норвегии Андерсен беспокоится о российских подводных лодках

Чтобы предотвратить файловый спам, у нас будет небольшой 15-секундный кулдаун для игроков, чтобы отправить файл. Таким образом, каждому игроку не будет присвоено ничего, кроме его имени и времени последней отправки файла.

Код структуры игры (Fileserver/src/game.rs)
// Файл server/src/game.rs

use std::{collections::HashMap, sync::Arc};

use parking_lot::Mutex;
use tokio::{sync::mpsc::UnboundedSender, time::Instant};
use tokio_tungstenite::tungstenite::Message;

use crate::types::{BroadcastEvents, Connection};

pub struct Player {
  // Время последнего отправленного файла. 
  // Так как, когда пользователь только зашел, он ещё не успел отправить
  // что-нибудь, записывать в это поле нечего. В таком случае будет
  // выставлено значение енама Option – None.
  pub dt_last_send: Option<Instant>,
}

#[derive(Clone)]
pub struct Game {
  // Мапа со всеми подключенными играками (Формат: username:Player).
  // Для того чтобы это поле было доступно для нескольких потоков,
  // его необходимо обернуть в смарт-поинтер Arc, которые реализует
  // множественное владение данными в нескольких потоках. А чтобы 
  // мапу можно было внутри Arc'а ещё и изменять, нужно дополнительно
  // завернуть её в Mutex. Так же Mutex обезопасит данные, ограничив
  // максимальное количество единовременно изменяемых мапу потоков до одного.
  pub players: Arc<Mutex<HashMap<String, Player>>>,

  // Отправитель сигналов в Broadcast
  pub broadcast_sender: UnboundedSender<BroadcastEvents>
}

impl Game {
  // Создаём новый инстанс
  pub fn new(broadcast_sender: UnboundedSender<BroadcastEvents>) -> Game {
    Game { 
      players: Arc::new(Mutex::new(HashMap::new())),
      broadcast_sender,
    }
  }
  
  // Добавляем игрока и отправляем сигнал в Broadcast.
 
  // Вызов метода send вернёт Result, который мы не хотим в данном
  // случае обрабатывать. Поэтому засунем его в игнорируемую переменную.
 
  // Так же проверяем есть ли такой игрок уже на сервере и 
  // не превысит ли количество игроков на сервере 50,
  // если игрок подключится. (Максимум 50 игроков)
  pub fn add_player(&self, username: String) {
    let mut players = self.players.lock();
    if !players.contains_key(&username) && players.len() < 50 {
      players.insert(username.clone(), Player { dt_last_send: None });
      let _ = self.broadcast_sender.send(BroadcastEvents::Join(username.clone()));
    }
  }

  // Добавляем соединение в Broadcast
  pub fn add_connection(&self, conn: Connection) {
    let _ = self.broadcast_sender.send(BroadcastEvents::AddConn(conn));
  }
 
  // Удаляем игрока и отправляем сигнал в Broadcast
  pub fn remove_player(&self, username: String, id: u32) {
    self.players.lock().remove(&username);
    let _ = self.broadcast_sender.send(BroadcastEvents::Quit(id, username));
  }

  // Формируем сообщение со списком пользователей.
  // Пример: LIST|userA,userB,userC
  pub fn get_list_message(&self) -> Message {
    let list_string = self.players.lock().iter().map(|w| w.0.to_owned()).collect::<Vec<String>>().join(",");

    Message::Text(format!("LIST|{}", list_string))
  }
	
  // Отправляем сообщение с файлом в Broadcast.
  // Перед этим проверяем прошел или нет у пользователя 
  // cooldown на отправку файла.
  pub fn send_file(&self, from: String, msg: Message) {
    let mut players = self.players.lock();
    if let Some(player) = players.get(&from) {
      if player.dt_last_send.is_none() || player.dt_last_send.unwrap().elapsed().as_secs() > 15 {
        let _ = self.broadcast_sender.send(BroadcastEvents::SendFile(msg));
        players.insert(from, Player { dt_last_send: Some(Instant::now()) });
      }
    }
  }
}

Получение запросов на подключение

Поскольку WebSocket работает поверх TCP, сначала нам просто нужно создать прослушивающий TCP-сокет, используя метод bind к TcpListener’у, и в цикле принимать все запросы на подключение.

ЧИТАТЬ   Наталья Котова назвала количество челябинцев, отметивших День Победы на городских площадках

Для оптимизации обработки соединений предпочтительнее выводить их в отдельный поток. В нашем случае в Task’у Tokyo.

Код дома main.rs
// Файл server/src/main.rs

// Методы для удобного управления потоком подключения WebSocket
use futures_util::{StreamExt, SinkExt};

use tokio::net::{TcpStream, TcpListener};

// Фунция для принятия WebSocket соединения
use tokio_tungstenite::accept_async;

use tokio::sync::mpsc;

// Структура для работы с регулярными выражениями
use regex::Regex;

// Рандомайзер для чисел
use rand::Rng;

use types::{Connection, BroadcastEvents, UserFileMessage};
use game::Game;

mod types;
mod game;
mod broadcast;

// Обязательно навешиваем на main макрос tokio::main, который
// создаст рантайм токио для исполнения асинхронного кода.
#[tokio::main]
async fn main () {
    // Создаём прослушивающий TCP сокет на порте 8080
	let server = TcpListener::bind("0.0.0.0:8080").await.unwrap();
	
    // Создаём неограниченный канал mpsc
	let (broadcast_sender, broadcast_receiver) = mpsc::unbounded_channel::<BroadcastEvents>();
  
	// Создаём новый поток токио и на выполнение ему передаём нашу 
    // функцию Broadcast.
	tokio::spawn(broadcast::run(broadcast_receiver));
	
	let game = Game::new(broadcast_sender);
	
	// В цикле принимаем все запросы на соединение
	loop {
		let (stream, _) = server.accept().await.unwrap();
        // Обрабатываем все соединения в отдельном токио потоке
		tokio::spawn(process_con(stream, game.clone()));
	}
}

// Функция обработки соединения
async fn process_con(stream: TcpStream, game: Game) {
	println!("Connection!")
}

Управление подключением

Наша функция управления подключением будет работать в 2 этапа.

На первом этапе он отправит пользователю список всех подключенных игроков, добавит подключение в список подключений в вещателе, чтобы подключенный пользователь уже мог получать данные о выходах и входах других игроков. И после этого будет ждать сообщение в формате “JOIN|» от пользователя. Получив его, она перейдет ко второму шагу.

На втором этапе он будет зацикливаться на сообщениях, содержащих файлы от пользователя, и отправлять их всем другим подключенным читателям. Когда цикл снова пытается прочитать сообщение и получает ошибку, это означает, что соединение с пользователем разорвано. После этого функция вызовет метод remove_player в случае gameкоторый удалит подключение пользователя из трансляции и его никнейм из списка игроков, а также уведомит всех подключенных пользователей об их отключении.

ЧИТАТЬ   «Если объект не будет продан до конца мая, цена должна быть снижена». Что происходит с домами в высокий сезон?
Код диспетчера входа в систему (файловый сервер/mail.rs)
// Файл server/src/main.rs

// ...

async fn process_con(stream: TcpStream, game: Game) {
    // Генерируем идентификатор для соединения
	let id = rand::thread_rng().gen::<u32>();
  
	// Производим websocket handshake, после чего получаем 
    // websocket поток или ошибку. В случае ошибки виртуальный 
    // поток токио оборвется.
	let websocket = accept_async(stream).await.unwrap();
	
	// Делим поток вебсокета на Sink и Stream объекты 
    // для того чтобы можно было читать данные с потока в 
    // одном потоке, а получать в другом. В нашем случае 
    // Sink объект (sender) мы отправим броадкастеру, а с 
    // Stream продолжим работать тут чтоб получать сообщения.
	let (mut sender, mut receiver) = websocket.split();

	let mut username = String::new(); 
	
	// Отправляем список игроков пользователю
	let _ = sender.send(game.get_list_message()).await;

	// Добавляем подключение в список подключений Broadcast
	game.add_connection(Connection {
		id,
		con: sender,
	});

	// Ждём сообщение входа от пользователя.
    // Если приходят любые другие сообщения, кроме текстовых
    // формата "JOIN|<username>" – игнорируем их.
    // При это длина имени игрока не должна превышать 13 символов.

    // Когда валидное сообщение получено, он добавляет игрока в список и 
    // сообщает всем остальнм о новом участнике.
	while let Some(msg) = receiver.next().await {
		if let Ok(msg) = msg {
			if !msg.is_text() {
				continue;
			}
			if let Ok(data) = msg.clone().into_text() {
				let re = Regex::new(r"JOIN\|[A-Za-z0-9]*$").unwrap();
				if re.is_match(&data) {
					username = (&data[5..]).to_owned();
					if username.len() > 13 {
						continue;
					}
					game.add_player(username.clone());
					break;
				}
			}
		} else {
			game.remove_player(username, id);
			return;
		}
	}

	// Обработываем полученные сообщения после входа.
    // Все не бинарные сообщение игнорируем, а бинарные парсим и 
    // проверяем на валидность. Если размер файла больше 5МБ, или 
    // если в сообщении отправителем указан не тот игрок, что 
    // записан в соединении – игнорируем. В ином случае, пересылаем 
    // сообщение всем подключенным игрокам.
	while let Some(msg) = receiver.next().await {
		if let Ok(msg) = msg {
			if !msg.is_binary() {
				continue;
			}
			if let Some(data) = UserFileMessage::from(msg.clone().into_data()) {
				if data.file_bytes.len() > 5_000_000 && data.username == username {
					continue;
				}

				game.send_file(data.username, msg);
			}
		} else {
			break;
		}
	}

	// Когда соединение оборвано (цикл закончился), удаляем игрока 
    // из списка и уведомляем всех остальных об этом.
	game.remove_player(username, id);
} 

Спасибо за внимание! 🙂 Также клиентскую часть игры можно найти в репозитории проекта по ссылке ниже.

Ссылка на полный код проекта – github.com/IDSaves/filecats

Source

От admin