Threads, Networking and Sharing

Changing the Unchangeable

If you're feeling pig-headed (as I get) you wonder if it's ever possible to get around the restrictions of the borrow checker.

Consider the following little program, which compiles and runs without problems.

// cell.rs
use std::cell::Cell;

fn main() {
    let answer = Cell::new(42);

    assert_eq!(answer.get(), 42);

    answer.set(77);

    assert_eq!(answer.get(), 77);
}

The answer was changed - and yet the variable answer was not mutable!

This is obviously perfectly safe, since the value inside the cell is only accessed through set and get. This goes by the grand name of interior mutability. The usual is called inherited mutability: if I have a struct value v, then I can only write to a field v.a if v itself is writeable. Cell values relax this rule, since we can change the value contained within them with set even if the cell itself is not mutable.

However, Cell only works with Copy types (e.g primitive types and user types deriving the Copy trait).

For other values, we have to get a reference we can work on, either mutable or immutable. This is what RefCell provides - you ask it explicitly for a reference to the contained value:

// refcell.rs
use std::cell::RefCell;

fn main() {
    let greeting = RefCell::new("hello".to_string());

    assert_eq!(*greeting.borrow(), "hello");
    assert_eq!(greeting.borrow().len(), 5);

    *greeting.borrow_mut() = "hola".to_string();

    assert_eq!(*greeting.borrow(), "hola");
}

Again, greeting was not declared as mutable!

The explicit dereference operator * can be a bit confusing in Rust, because often you don't need it - for instance greeting.borrow().len() is fine since method calls will dereference implicitly. But you do need * to pull out the underlying &String from greeting.borrow() or the &mut String from greeting.borrow_mut().

Using a RefCell isn't always safe, because any references returned from these methods must follow the usual rules.

# #![allow(unused_variables)]
# 
#fn main() {
    let mut gr = greeting.borrow_mut(); // gr is a mutable borrow
    *gr = "hola".to_string();

    assert_eq!(*greeting.borrow(), "hola"); // <== we blow up here!
....
thread 'main' panicked at 'already mutably borrowed: BorrowError'

#}

You cannot borrow immutably if you have already borrowed mutably! Except - and this is important - the violation of the rules happens at runtime. The solution (as always) is to keep the scope of mutable borrows as limited as possible - in this case, you could put a block around the first two lines here so that the mutable reference gr gets dropped before we borrow again.

So, this is not a feature you use without good reason, since you will not get a compile-time error. These types provide dynamic borrowing in cases where the usual rules make some things impossible.

Shared References

Up to now, the relationship between a value and its borrowed references has been clear and known at compile time. The value is the owner, and the references cannot outlive it. But many cases simply don't fit into this neat pattern. For example, say we have a Player struct and a Role struct. A Player keeps a vector of references to Role objects. There isn't a neat one-to-one relationship between these values, and persuading rustc to cooperate becomes nasty.

Rc works like Box - heap memory is allocated and the value is moved to it. If you clone a Box, it allocates a full cloned copy of the value. But cloning an Rc is cheap, because each time you clone it just updates a reference count to the same data. This is an old and very popular strategy for memory management, for example it's used in the Objective C runtime on iOS/MacOS. (In modern C++, it is implemented with std::shared_ptr.)

When a Rc is dropped, the reference count is decremented. When that count goes to zero the owned value is dropped and the memory freed.

// rc1.rs
use std::rc::Rc;

fn main() {
    let s = "hello dolly".to_string();
    let rs1 = Rc::new(s); // s moves to heap; ref count 1
    let rs2 = rs1.clone(); // ref count 2

    println!("len {}, {}", rs1.len(), rs2.len());
} // both rs1 and rs2 drop, string dies.

You may make as many references as you like to the original value - it's dynamic borrowing again. You do not have to carefully track the relationship between the value T and its references &T. There is some runtime cost involved, so it isn't the first solution you choose, but it makes patterns of sharing possible which would fall foul of the borrow checker. Note that Rc gives you immutable shared references, since otherwise that would break one of the very basic rules of borrowing. A leopard can't change its spots without ceasing to be a leopard.

In the case of a Player, it can now keep its roles as a Vec<Rc<Role>> and things work out fine - we can add or remove roles but not change them after their creation.

However, what if each Player needs to keep references to a team as a vector of Player references? Then everything becomes immutable, because all the Player values need to be stored as Rc! This is the place where RefCell becomes necessary. The team may be then defined as Vec<Rc<RefCell<Player>>>. It is now possible to change a Player value using borrow_mut, provided no-one has 'checked out' a reference to a Player at the same time. For example, say we have a rule that if something special happens to a player, then all of their team gets stronger:

# #![allow(unused_variables)]
# 
#fn main() {
    for p in &self.team {
        p.borrow_mut().make_stronger();
    }

#}

So the application code isn't too bad, but the type signatures get a bit scary. You can always simplify them with a type alias:

# #![allow(unused_variables)]
# 
#fn main() {
type PlayerRef = Rc<RefCell<Player>>;

#}

Multithreading

Over the last twenty years, there has been a shift away from raw processing speed to CPUs having multiple cores. So the only way to get the most out of a modern computer is to keep all of those cores busy. It's certainly possible to spawn child processes in the background as we saw with Command but there's still a synchronization problem: we don't know exactly when those children are finished without waiting on them.

There are other reasons for needing separate threads of execution, of course. You cannot afford to lock up your whole process just to wait on blocking i/o, for instance.

Spawning threads is straightforward in Rust - you feed spawn a closure which is executed in the background.

// thread1.rs
use std::thread;
use std::time;

fn main() {
    thread::spawn(|| println!("hello"));
    thread::spawn(|| println!("dolly"));

    println!("so fine");
    // wait a little bit
    thread::sleep(time::Duration::from_millis(100));
}
// so fine
// hello
// dolly

Well obviously just 'wait a little bit' is not a very rigorous solution! It's better to call join on the returned object - then the main thread waits for the spawned thread to finish.

// thread2.rs
use std::thread;

fn main() {
    let t = thread::spawn(|| {
        println!("hello");
    });
    println!("wait {:?}", t.join());
}
// hello
// wait Ok(())

Here's an interesting variation: force the new thread to panic.

# #![allow(unused_variables)]
# 
#fn main() {
    let t = thread::spawn(|| {
        println!("hello");
        panic!("I give up!");
    });
    println!("wait {:?}", t.join());

#}

We get a panic as expected, but only the panicking thread dies! We still manage to print out the error message from the join. So yes, panics are not always fatal, but threads are relatively expensive, so this should not be seen as a routine way of handling panics.

hello
thread '<unnamed>' panicked at 'I give up!', thread2.rs:7
note: Run with `RUST_BACKTRACE=1` for a backtrace.
wait Err(Any)

The returned objects can be used to keep track of multiple threads:

// thread4.rs
use std::thread;

fn main() {
    let mut threads = Vec::new();

    for i in 0..5 {
        let t = thread::spawn(move || {
            println!("hello {}", i);
        });
        threads.push(t);
    }

    for t in threads {
        t.join().expect("thread failed");
    }
}
// hello 0
// hello 2
// hello 4
// hello 3
// hello 1

Rust insists that we handle the case where the join failed - i.e. that thread panicked. (You would typically not bail out of the main program when this happens, just note the error, retry etc)

There is no particular order to thread execution (this program gives different orders for different runs), and this is key - they really are independent threads of execution. Multithreading is easy; what's hard is concurrency - managing and synchronizing multiple threads of execution.

Threads Don't Borrow

It's possible for the thread closure to capture values, but by moving, not by borrowing!

// thread3.rs
use std::thread;

fn main() {
    let name = "dolly".to_string();
    let t = thread::spawn(|| {
        println!("hello {}", name);
    });
    println!("wait {:?}", t.join());
}

And here's the helpful error message:

error[E0373]: closure may outlive the current function, but it borrows `name`, which is owned by the current function
 --> thread3.rs:6:27
  |
6 |     let t = thread::spawn(|| {
  |                           ^^ may outlive borrowed value `name`
7 |         println!("hello {}", name);
  |                             ---- `name` is borrowed here
  |
help: to force the closure to take ownership of `name` (and any other referenced variables), use the `move` keyword, as shown:
  |     let t = thread::spawn(move || {

That's fair enough! Imagine spawning this thread from a function - it will exist after the function call has finished and name gets dropped. So adding move solves our problem.

But this is a move, so name may only appear in one thread! I'd like to emphasize that it is possible to share references, but they need to have static lifetime:

# #![allow(unused_variables)]
# 
#fn main() {
let name = "dolly";
let t1 = thread::spawn(move || {
    println!("hello {}", name);
});
let t2 = thread::spawn(move || {
    println!("goodbye {}", name);
});

#}

name exists for the whole duration of the program (static), so rustc is satisfied that the closure will never outlive name. However, most interesting references do not have static lifetimes!

Threads can't share the same environment - by design in Rust. In particular, they cannot share regular references because the closures move their captured variables.

shared references are fine however, because their lifetime is 'as long as needed' - but you cannot use Rc for this. This is because Rc is not thread safe - it's optimized to be fast for the non-threaded case. Fortunately it is a compile error to use Rc here; the compiler is watching your back as always.

For threads, you need std::sync::Arc - 'Arc' stands for 'Atomic Reference Counting'. That is, it guarantees that the reference count will be modified in one logical operation. To make this guarantee, it must ensure that the operation is locked so that only the current thread has access. clone is still much cheaper than actually making a copy however.

// thread5.rs
use std::thread;
use std::sync::Arc;

struct MyString(String);

impl MyString {
    fn new(s: &str) -> MyString {
        MyString(s.to_string())
    }
}

fn main() {
    let mut threads = Vec::new();
    let name = Arc::new(MyString::new("dolly"));

    for i in 0..5 {
        let tname = name.clone();
        let t = thread::spawn(move || {
            println!("hello {} count {}", tname.0, i);
        });
        threads.push(t);
    }

    for t in threads {
        t.join().expect("thread failed");
    }
}

I"ve deliberately created a wrapper type for String here (a 'newtype') since our MyString does not implement Clone. But the shared reference can be cloned!

The shared reference name is passed to each new thread by making a new reference with clone and moving it into the closure. It's a little verbose, but this is a safe pattern. Safety is important in concurrency precisely because the problems are so unpredictable. A program may run fine on your machine, but occasionally crash on the server, usually on the weekend. Worse still, the symptoms of such problems are not easy to diagnose.

Channels

There are ways to send data between threads. This is done in Rust using channels. std::sync::mpsc::channel() returns a tuple consisting of the receiver channel and the sender channel. Each thread is passed a copy of the sender with clone, and calls send. Meanwhile the main thread calls recv on the receiver.

'MPSC' stands for 'Multiple Producer Single Consumer'. We create multiple threads which attempt to send to the channel, and the main thread 'consumes' the channel.

// thread9.rs
use std::thread;
use std::sync::mpsc;

fn main() {
    let nthreads = 5;
    let (tx, rx) = mpsc::channel();

    for i in 0..nthreads {
        let tx = tx.clone();
        thread::spawn(move || {
            let response = format!("hello {}", i);
            tx.send(response).unwrap();
        });
    }

    for _ in 0..nthreads {
        println!("got {:?}", rx.recv());
    }
}
// got Ok("hello 0")
// got Ok("hello 1")
// got Ok("hello 3")
// got Ok("hello 4")
// got Ok("hello 2")

There's no need to join here since the threads send their response just before they end execution, but obviously this can happen at any time. recv will block, and will return an error if the sender channel is disconnected. recv_timeout will only block for a given time period, and may return a timeout error as well.

send never blocks, which is useful because threads can push out data without waiting for the receiver to process. In addition, the channel is buffered so multiple send operations can take place, which will be received in order.

However, not blocking means that Ok does not automatically mean 'successfully delivered message'!

A sync_channel does block on send. With an argument of zero, the send blocks until the recv happens. The threads must meet up or rendezvous (on the sound principle that most things sound better in French.)

# #![allow(unused_variables)]
# 
#fn main() {
    let (tx, rx) = mpsc::sync_channel(0);

    let t1 = thread::spawn(move || {
        for i in 0..5 {
            tx.send(i).unwrap();
        }
    });

    for _ in 0..5 {
        let res = rx.recv().unwrap();
        println!("{}",res);
    }
    t1.join().unwrap();

#}

We can easily cause an error here by calling recv when there has been no corresponding send, e.g by looping for i in 0..4. The thread ends, and tx drops, and then recv will fail. This will also happen if the thread panics, which causes its stack to be unwound, dropping any values.

If the sync_channel was created with a non-zero argument n, then it acts like a queue with a maximum size of n - send will only block when it tries to add more than n values to the queue.

Channels are strongly typed - here the channel had type i32 - but type inference makes this implicit. If you need to pass different kinds of data, then enums are a good way to express this.

Synchronization

Let's look at synchronization. join is very basic, and merely waits until a particular thread has finished. A sync_channel synchronizes two threads - in the last example, the spawned thread and the main thread are completely locked together.

Barrier synchronization is a checkpoint where the threads must wait until all of them have reached that point. Then they can keep going as before. The barrier is created with the number of threads that we want to wait for. As before we use use Arc to share the barrier with all the threads.

// thread7.rs
use std::thread;
use std::sync::Arc;
use std::sync::Barrier;

fn main() {
    let nthreads = 5;
    let mut threads = Vec::new();
    let barrier = Arc::new(Barrier::new(nthreads));

    for i in 0..nthreads {
        let barrier = barrier.clone();
        let t = thread::spawn(move || {
            println!("before wait {}", i);
            barrier.wait();
            println!("after wait {}", i);
        });
        threads.push(t);
    }

    for t in threads {
        t.join().unwrap();
    }
}
// before wait 2
// before wait 0
// before wait 1
// before wait 3
// before wait 4
// after wait 4
// after wait 2
// after wait 3
// after wait 0
// after wait 1

The threads do their semi-random thing, all meet up, and then continue. It's like a kind of resumable join and useful when you need to farm off pieces of a job to different threads and want to take some action when all the pieces are finished.

Shared State

How can threads modify shared state?

Recall the Rc<RefCell<T>> strategy for dynamically doing a mutable borrow on shared references. The threading equivalent to RefCell is Mutex - you may get your mutable reference by calling lock. While this reference exists, no other thread can access it. mutex stands for 'Mutual Exclusion' - we lock a section of code so that only one thread can access it, and then unlock it. You get the lock with the lock method, and it is unlocked when the reference is dropped.

// thread9.rs
use std::thread;
use std::sync::Arc;
use std::sync::Mutex;

fn main() {
    let answer = Arc::new(Mutex::new(42));

    let answer_ref = answer.clone();
    let t = thread::spawn(move || {
        let mut answer = answer_ref.lock().unwrap();
        *answer = 55;
    });

    t.join().unwrap();

    let ar = answer.lock().unwrap();
    assert_eq!(*ar, 55);

}

This isn't so straightforward as using RefCell because asking for the lock on the mutex might fail, if another thread has panicked while holding the lock. (In this case, the documentation actually recommends just exiting the thread with unwrap because things have gone seriously wrong!)

It's even more important to keep this mutable borrow as short as possible, because as long as the mutex is locked, other threads are blocked. This is not the place for expensive calculations! So typically such code would be used like this:

# #![allow(unused_variables)]
# 
#fn main() {
// ... do something in the thread
// get a locked reference and use it briefly!
{
    let mut data = data_ref.lock().unwrap();
    // modify data
}
//... continue with the thread

#}

Higher-Level Operations

It's better to find higher-level ways of doing threading, rather than managing the synchronization yourself. An example is when you need to do things in parallel and collect the results. One very cool crate is pipeliner which has a very straightforward API. Here's the 'Hello, World!' - an iterator feeds us inputs and we execute up to n of the operations on the values in parallel.

extern crate pipeliner;
use pipeliner::Pipeline;

fn main() {
    for result in (0..10).with_threads(4).map(|x| x + 1) {
        println!("result: {}", result);
    }
}
// result: 1
// result: 2
// result: 5
// result: 3
// result: 6
// result: 7
// result: 8
// result: 9
// result: 10
// result: 4

It's a silly example of course, because the operation is so cheap to calculate, but shows how easy it is to run code in parallel.

Here's something more useful. Doing network operations in parallel is very useful, because they can take time, and you don't want to wait for them all to finish before starting to do work.

This example is pretty crude (believe me, there are better ways of doing it) but here we want to focus on the principle. We reuse the shell function defined in section 4 to call ping on a range of IP4 addresses.

extern crate pipeliner;
use pipeliner::Pipeline;

use std::process::Command;

fn shell(cmd: &str) -> (String,bool) {
    let cmd = format!("{} 2>&1",cmd);
    let output = Command::new("/bin/sh")
        .arg("-c")
        .arg(&cmd)
        .output()
        .expect("no shell?");
    (
        String::from_utf8_lossy(&output.stdout).trim_right().to_string(),
        output.status.success()
    )
}

fn main() {
    let addresses: Vec<_> = (1..40).map(|n| format!("ping -c1 192.168.0.{}",n)).collect();
    let n = addresses.len();

    for result in addresses.with_threads(n).map(|s| shell(&s)) {
        if result.1 {
            println!("got: {}", result.0);
        }
    }

}

And the result on my home network looks like this:

got: PING 192.168.0.1 (192.168.0.1) 56(84) bytes of data.
64 bytes from 192.168.0.1: icmp_seq=1 ttl=64 time=43.2 ms

--- 192.168.0.1 ping statistics ---
1 packets transmitted, 1 received, 0% packet loss, time 0ms
rtt min/avg/max/mdev = 43.284/43.284/43.284/0.000 ms
got: PING 192.168.0.18 (192.168.0.18) 56(84) bytes of data.
64 bytes from 192.168.0.18: icmp_seq=1 ttl=64 time=0.029 ms

--- 192.168.0.18 ping statistics ---
1 packets transmitted, 1 received, 0% packet loss, time 0ms
rtt min/avg/max/mdev = 0.029/0.029/0.029/0.000 ms
got: PING 192.168.0.3 (192.168.0.3) 56(84) bytes of data.
64 bytes from 192.168.0.3: icmp_seq=1 ttl=64 time=110 ms

--- 192.168.0.3 ping statistics ---
1 packets transmitted, 1 received, 0% packet loss, time 0ms
rtt min/avg/max/mdev = 110.008/110.008/110.008/0.000 ms
got: PING 192.168.0.5 (192.168.0.5) 56(84) bytes of data.
64 bytes from 192.168.0.5: icmp_seq=1 ttl=64 time=207 ms
...

The active addresses come through pretty fast within the first half-second, and we then wait for the negative results to come in. Otherwise, we would wait for the better part of a minute! You can now proceed to scrape things like ping times from the output, although this would only work on Linux. ping is universal, but the exact output format is different for each platform. To do better we need to use the cross-platform Rust networking API, and so let's move onto Networking.

A Better Way to Resolve Addresses

If you just want availability and not detailed ping statistics, the std::net::ToSocketAddrs trait will do any DNS resolution for you:

use std::net::*;

fn main() {
    for res in "google.com:80".to_socket_addrs().expect("bad") {
        println!("got {:?}", res);
    }
}
// got V4(216.58.223.14:80)
// got V6([2c0f:fb50:4002:803::200e]:80)

It's an iterator because there is often more than one interface associated with a domain - there are both IPV4 and IPV6 interfaces to Google.

So, let's naively use this method to rewrite the pipeliner example. Most networking protocols use both an address and a port:

extern crate pipeliner;
use pipeliner::Pipeline;

use std::net::*;

fn main() {
    let addresses: Vec<_> = (1..40).map(|n| format!("192.168.0.{}:0",n)).collect();
    let n = addresses.len();

    for result in addresses.with_threads(n).map(|s| s.to_socket_addrs()) {
        println!("got: {:?}", result);
    }
}
// got: Ok(IntoIter([V4(192.168.0.1:0)]))
// got: Ok(IntoIter([V4(192.168.0.39:0)]))
// got: Ok(IntoIter([V4(192.168.0.2:0)]))
// got: Ok(IntoIter([V4(192.168.0.3:0)]))
// got: Ok(IntoIter([V4(192.168.0.5:0)]))
// ....

This is much faster than the ping example because it's just checking that the IP address is valid - if we fed it a list of actual domain names the DNS lookup could take some time, hence the importance of parallelism.

Suprisingly, it sort-of Just Works. The fact that everything in the standard library implements Debug is great for exploration as well as debugging. The iterator is returning Result (hence Ok) and in that Result is an IntoIter into a SocketAddr which is an enum with either a ipv4 or a ipv6 address. Why IntoIter? Because a socket may have multiple addresses (e.g. both ipv4 and ipv6).

# #![allow(unused_variables)]
# 
#fn main() {
    for result in addresses.with_threads(n)
        .map(|s| s.to_socket_addrs().unwrap().next().unwrap())
    {
        println!("got: {:?}", result);
    }
// got: V4(192.168.0.1:0)
// got: V4(192.168.0.39:0)
// got: V4(192.168.0.3:0)

#}

This also works, surprisingly enough, at least for our simple example. The first unwrap gets rid of the Result, and then we explicitly pull the first value out of the iterator. The Result will get bad typically when we give a nonsense address (like an address name without a port.)

TCP Client Server

Rust provides a straightforward interface to the most commonly used network protocol, TCP. It is very fault-resistant and is the base on which our networked world is built - packets of data are sent and received, with acknowledgement. By contrast, UDP sends packets out into the wild without acknowledgement - there's a joke that goes "I could tell you a joke about UDP but you might not get it." (Jokes about networking are only funny for a specialized meaning of the word 'funny')

However, error handling is very important with networking, because anything can happen, and will, eventually.

TCP works as a client/server model; the server listens on a address and a particular network port, and the client connects to that server. A connection is established and thereafter the client and server can communicate with a socket.

TcpStream::connect takes anything that can convert into a SocketAddr, in particular the plain strings we have been using.

A simple TCP client in Rust is easy - a TcpStream struct is both readable and writeable. As usual, we have to bring the Read, Write and other std::io traits into scope:

// client.rs
use std::net::TcpStream;
use std::io::prelude::*;

fn main() {
    let mut stream = TcpStream::connect("127.0.0.1:8000").expect("connection failed");

    write!(stream,"hello from the client!\n").expect("write failed");
 }

The server is not much more complicated; we set up a listener and wait for connections. When a client connects, we get a TcpStream on the server side. In this case, we read everything that the client has written into a string.

// server.rs
use std::net::TcpListener;
use std::io::prelude::*;

fn main() {

    let listener = TcpListener::bind("127.0.0.1:8000").expect("could not start server");

    // accept connections and get a TcpStream
    for connection in listener.incoming() {
        match connection {
            Ok(mut stream) => {
                let mut text = String::new();
                stream.read_to_string(&mut text).expect("read failed");
                println!("got '{}'", text);
            }
            Err(e) => { println!("connection failed {}", e); }
        }
    }
}

Here I've chosen a port number moreorless at random, but most ports are assigned some meaning.

Note that both parties have to agree on a protocol - the client expects it can write text to the stream, and the server expects to read text from the stream. If they don't play the same game, then situations can occur where one party is blocked, waiting for bytes that never come.

Error checking is important - network I/O can fail for many reasons, and errors that might appear once in a blue moon on a local filesystem can happen on a regular basis. Someone can trip over the network cable, the other party could crash, and so forth. This little server isn't very robust, because it will fall over on the first read error.

Here is a more solid server that handles the error without failing. It also specifically reads a line from the stream, which is done using io::BufReader to create an io::BufRead on which we can call read_line.

// server2.rs
use std::net::{TcpListener, TcpStream};
use std::io::prelude::*;
use std::io;

fn handle_connection(stream: TcpStream) -> io::Result<()>{
    let mut rdr = io::BufReader::new(stream);
    let mut text = String::new();
    rdr.read_line(&mut text)?;
    println!("got '{}'", text.trim_right());
    Ok(())
}

fn main() {

    let listener = TcpListener::bind("127.0.0.1:8000").expect("could not start server");

    // accept connections and get a TcpStream
    for connection in listener.incoming() {
        match connection {
            Ok(stream) => {
                if let Err(e) = handle_connection(stream) {
                    println!("error {:?}", e);
                }
            }
            Err(e) => { print!("connection failed {}\n", e); }
        }
    }
}

read_line might fail in handle_connection, but the resulting error is safely handled.

One-way communications like this are certainly useful - for instance. a set of services across a network which want to collect their status reports together in one central place. But it's reasonable to expect a polite reply, even if just 'ok'!

A simple example is a basic 'echo' server. The client writes some text ending in a newline to the server, and receives the same text back with a newline - the stream is readable and writeable.

// client_echo.rs
use std::io::prelude::*;
use std::net::TcpStream;

fn main() {
    let mut stream = TcpStream::connect("127.0.0.1:8000").expect("connection failed");
    let msg = "hello from the client!";

    write!(stream,"{}\n", msg).expect("write failed");

    let mut resp = String::new();
    stream.read_to_string(&mut resp).expect("read failed");
    let text = resp.trim_right();
    assert_eq!(msg,text);
}

The server has an interesting twist. Only handle_connection changes:

# #![allow(unused_variables)]
# 
#fn main() {
fn handle_connection(stream: TcpStream) -> io::Result<()>{
    let mut ostream = stream.try_clone()?;
    let mut rdr = io::BufReader::new(stream);
    let mut text = String::new();
    rdr.read_line(&mut text)?;
    ostream.write_all(text.as_bytes())?;
    Ok(())
}

#}

This is a common gotcha with simple two-way socket communication; we want to read a line, so need to feed the readable stream to BufReader - but it consumes the stream! So we have to clone the stream, creating a new struct which refers to the same underlying socket. Then we have happiness.