Skip to content

Commit 1379c1e

Browse files
committed
First publication.
0 parents  commit 1379c1e

File tree

10 files changed

+413
-0
lines changed

10 files changed

+413
-0
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
/target

Cargo.toml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
[package]
2+
name = "async-chat"
3+
version = "0.1.0"
4+
authors = ["Jim Blandy <[email protected]>"]
5+
edition = "2018"
6+
7+
[dependencies]
8+
async-std = { version = "1.7", features = ["unstable"] }
9+
tokio = { version = "0.3", features = ["sync"] }
10+
serde = { version = "1.0", features = ["derive", "rc"] }
11+
serde_json = "1.0"

rustfmt.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
# Lines wider than 80 columns don't fit in the book so well.
2+
max_width = 80

src/bin/client.rs

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
//# send-commands
2+
use async_std::prelude::*;
3+
use async_chat::utils::{self, ChatResult};
4+
use async_std::io;
5+
use async_std::net;
6+
7+
async fn send_commands(mut to_server: net::TcpStream) -> ChatResult<()> {
8+
println!("Commands:\n\
9+
join GROUP\n\
10+
post GROUP MESSAGE...\n\
11+
Type Control-D (on Unix) or Control-Z (on Windows) \
12+
to close the connection.");
13+
14+
let mut command_lines = io::BufReader::new(io::stdin()).lines();
15+
while let Some(command) = command_lines.next().await {
16+
let command = command?;
17+
let request = match parse_command(&command) {
18+
Some(request) => request,
19+
None => continue,
20+
};
21+
22+
utils::send_as_json(&mut to_server, &request).await?;
23+
to_server.flush().await?;
24+
}
25+
26+
Ok(())
27+
}
28+
//#end
29+
30+
//# client-handle-replies
31+
use async_chat::FromServer;
32+
33+
async fn handle_replies(from_server: net::TcpStream) -> ChatResult<()> {
34+
let buffered = io::BufReader::new(from_server);
35+
let mut reply_stream = utils::receive_as_json(buffered);
36+
37+
while let Some(reply) = reply_stream.next().await {
38+
match reply? {
39+
FromServer::Message { group_name, message } => {
40+
println!("message posted to {}: {}", group_name, message);
41+
}
42+
FromServer::Error(message) => {
43+
println!("error from server: {}", message);
44+
}
45+
}
46+
}
47+
48+
Ok(())
49+
}
50+
//# end
51+
52+
//# client-main
53+
use async_std::task;
54+
55+
fn main() -> ChatResult<()> {
56+
let address = std::env::args().nth(1)
57+
.expect("Usage: client ADDRESS:PORT");
58+
59+
task::block_on(async {
60+
let socket = net::TcpStream::connect(address).await?;
61+
socket.set_nodelay(true)?;
62+
63+
let to_server = send_commands(socket.clone());
64+
let from_server = handle_replies(socket);
65+
66+
from_server.race(to_server).await?;
67+
68+
Ok(())
69+
})
70+
}
71+
//# end
72+
73+
use async_chat::FromClient;
74+
use std::sync::Arc;
75+
76+
/// Parse a line (presumably read from the standard input) as a `Request`.
77+
fn parse_command(line: &str) -> Option<FromClient> {
78+
let (command, rest) = get_next_token(line)?;
79+
if command == "post" {
80+
let (group, rest) = get_next_token(rest)?;
81+
let message = rest.trim_start().to_string();
82+
return Some(FromClient::Post {
83+
group_name: Arc::new(group.to_string()),
84+
message: Arc::new(message),
85+
});
86+
} else if command == "join" {
87+
let (group, rest) = get_next_token(rest)?;
88+
if !rest.trim_start().is_empty() {
89+
return None;
90+
}
91+
return Some(FromClient::Join {
92+
group_name: Arc::new(group.to_string()),
93+
});
94+
} else {
95+
eprintln!("Unrecognized command: {:?}", line);
96+
return None;
97+
}
98+
}
99+
100+
/// Given a string `input`, return `Some((token, rest))`, where `token` is the
101+
/// first run of non-whitespace characters in `input`, and `rest` is the rest of
102+
/// the string. If the string contains no non-whitespace characters, return
103+
/// `None`.
104+
fn get_next_token(mut input: &str) -> Option<(&str, &str)> {
105+
input = input.trim_start();
106+
107+
if input.is_empty() {
108+
return None;
109+
}
110+
111+
match input.find(char::is_whitespace) {
112+
Some(space) => Some((&input[0..space], &input[space..])),
113+
None => Some((input, "")),
114+
}
115+
}

src/bin/server/connection.rs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/// Handle a single client's connection.
2+
3+
//# chat-serve
4+
use async_chat::{FromClient, FromServer};
5+
use async_chat::utils::{self, ChatResult};
6+
use async_std::prelude::*;
7+
use async_std::io::BufReader;
8+
use async_std::net::TcpStream;
9+
use async_std::sync::Arc;
10+
11+
use crate::group_table::GroupTable;
12+
13+
pub async fn serve(socket: TcpStream, groups: Arc<GroupTable>)
14+
-> ChatResult<()>
15+
{
16+
let outbound = Arc::new(Outbound::new(socket.clone()));
17+
18+
let buffered = BufReader::new(socket);
19+
let mut from_client = utils::receive_as_json(buffered);
20+
while let Some(request) = from_client.next().await {
21+
let request = request?;
22+
23+
let result = match request {
24+
FromClient::Join { group_name } => {
25+
let group = groups.get_or_create(group_name);
26+
group.join(outbound.clone());
27+
Ok(())
28+
}
29+
30+
FromClient::Post { group_name, message } => {
31+
match groups.get(&group_name) {
32+
Some(group) => {
33+
group.post(message);
34+
Ok(())
35+
}
36+
None => {
37+
Err(format!("Group '{}' does not exist", group_name))
38+
}
39+
}
40+
}
41+
};
42+
43+
if let Err(message) = result {
44+
let report = FromServer::Error(message);
45+
outbound.send(report).await?;
46+
}
47+
}
48+
49+
Ok(())
50+
}
51+
//# end
52+
53+
//# chat-Outbound
54+
use async_std::sync::Mutex;
55+
56+
pub struct Outbound(Mutex<TcpStream>);
57+
58+
impl Outbound {
59+
pub fn new(to_client: TcpStream) -> Outbound {
60+
Outbound(Mutex::new(to_client))
61+
}
62+
63+
pub async fn send(&self, packet: FromServer) -> ChatResult<()> {
64+
let mut guard = self.0.lock().await;
65+
utils::send_as_json(&mut *guard, &packet).await?;
66+
guard.flush().await?;
67+
Ok(())
68+
}
69+
}
70+
//# end

src/bin/server/group.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
//! A chat group.
2+
3+
//# group-type
4+
use async_std::task;
5+
use crate::connection::Outbound;
6+
use std::sync::Arc;
7+
use tokio::sync::broadcast;
8+
9+
pub struct Group {
10+
name: Arc<String>,
11+
sender: broadcast::Sender<Arc<String>>
12+
}
13+
14+
impl Group {
15+
pub fn new(name: Arc<String>) -> Group {
16+
let (sender, _receiver) = broadcast::channel(1000);
17+
Group { name, sender }
18+
}
19+
20+
pub fn join(&self, outbound: Arc<Outbound>) {
21+
let receiver = self.sender.subscribe();
22+
23+
task::spawn(handle_subscriber(self.name.clone(),
24+
receiver,
25+
outbound));
26+
}
27+
28+
pub fn post(&self, message: Arc<String>) {
29+
// This only returns an error when there are no subscribers. A
30+
// connection's outgoing side can exit, dropping its subscription,
31+
// slightly before its incoming side, which may end up trying to send a
32+
// message to an empty group.
33+
let _ignored = self.sender.send(message);
34+
}
35+
}
36+
//# end
37+
38+
//# handle_subscriber
39+
use async_chat::FromServer;
40+
use tokio::sync::broadcast::error::RecvError;
41+
42+
async fn handle_subscriber(group_name: Arc<String>,
43+
mut receiver: broadcast::Receiver<Arc<String>>,
44+
outbound: Arc<Outbound>)
45+
{
46+
loop {
47+
let packet = match receiver.recv().await {
48+
Ok(message) => FromServer::Message {
49+
group_name: group_name.clone(),
50+
message: message.clone(),
51+
},
52+
53+
Err(RecvError::Lagged(n)) => FromServer::Error(
54+
format!("Dropped {} messages from {}.", n, group_name)
55+
),
56+
57+
Err(RecvError::Closed) => break,
58+
};
59+
60+
if outbound.send(packet).await.is_err() {
61+
break;
62+
}
63+
}
64+
}
65+
//# end

src/bin/server/group_table.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
//# group-table
2+
use crate::group::Group;
3+
use std::collections::HashMap;
4+
use std::sync::{Arc, Mutex};
5+
6+
pub struct GroupTable(Mutex<HashMap<Arc<String>, Arc<Group>>>);
7+
8+
impl GroupTable {
9+
pub fn new() -> GroupTable {
10+
GroupTable(Mutex::new(HashMap::new()))
11+
}
12+
13+
pub fn get(&self, name: &String) -> Option<Arc<Group>> {
14+
self.0.lock()
15+
.unwrap()
16+
.get(name)
17+
.cloned()
18+
}
19+
20+
pub fn get_or_create(&self, name: Arc<String>) -> Arc<Group> {
21+
self.0.lock()
22+
.unwrap()
23+
.entry(name.clone())
24+
.or_insert_with(|| Arc::new(Group::new(name)))
25+
.clone()
26+
}
27+
}
28+
//# end
29+

src/bin/server/main.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
//! Asynchronous chat server.
2+
3+
//# server-main
4+
use async_std::prelude::*;
5+
use async_std::{net, task};
6+
use async_chat::utils::ChatResult;
7+
use std::sync::Arc;
8+
9+
mod connection;
10+
mod group;
11+
mod group_table;
12+
13+
fn main() -> ChatResult<()> {
14+
let address = std::env::args().nth(1).expect("Usage: server ADDRESS");
15+
16+
let groups = Arc::new(group_table::GroupTable::new());
17+
18+
task::block_on(async {
19+
let listener = net::TcpListener::bind(address).await?;
20+
21+
let mut new_connections = listener.incoming();
22+
loop {
23+
let socket = new_connections.next().await.unwrap()?;
24+
let groups = groups.clone();
25+
task::spawn(async {
26+
if let Err(error) = connection::serve(socket, groups).await {
27+
eprintln!("Error: {}", error);
28+
}
29+
});
30+
}
31+
})
32+
}
33+
//# end

src/lib.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
pub mod utils;
2+
3+
//# request-and-reply
4+
use serde::{Deserialize, Serialize};
5+
use std::sync::Arc;
6+
7+
#[derive(Debug, Deserialize, Serialize, PartialEq)]
8+
pub enum FromClient {
9+
Join { group_name: Arc<String> },
10+
Post {
11+
group_name: Arc<String>,
12+
message: Arc<String>,
13+
},
14+
}
15+
16+
#[derive(Debug, Deserialize, Serialize, PartialEq)]
17+
pub enum FromServer {
18+
Message {
19+
group_name: Arc<String>,
20+
message: Arc<String>,
21+
},
22+
Error(String),
23+
}
24+
//# end

0 commit comments

Comments
 (0)