test(server): remove unneeded mutex for TestService

This commit is contained in:
Sean McArthur
2018-12-06 11:06:45 -08:00
parent fdd0413418
commit 138b1f8a7c

View File

@@ -14,7 +14,7 @@ use std::net::{TcpStream, Shutdown, SocketAddr};
use std::io::{self, Read, Write}; use std::io::{self, Read, Write};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc; use std::sync::mpsc;
use std::sync::{Arc, Mutex}; use std::sync::{Arc};
use std::net::{TcpListener as StdTcpListener}; use std::net::{TcpListener as StdTcpListener};
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
@@ -701,7 +701,7 @@ fn header_connection_close() {
} }
#[test] #[test]
fn expect_continue() { fn expect_continue_sends_100() {
let server = serve(); let server = serve();
let mut req = connect(server.addr()); let mut req = connect(server.addr());
server.reply(); server.reply();
@@ -1655,7 +1655,7 @@ impl Drop for Serve {
#[derive(Clone)] #[derive(Clone)]
struct TestService { struct TestService {
tx: Arc<Mutex<mpsc::Sender<Msg>>>, tx: mpsc::Sender<Msg>,
reply: spmc::Receiver<Reply>, reply: spmc::Receiver<Reply>,
} }
@@ -1670,51 +1670,52 @@ enum Reply {
#[derive(Debug)] #[derive(Debug)]
enum Msg { enum Msg {
//Head(Request),
Chunk(Vec<u8>), Chunk(Vec<u8>),
Error(hyper::Error), Error(hyper::Error),
End, End,
} }
impl TestService { impl TestService {
// Box is needed until we can return `impl Future` from a fn
fn call(&self, req: Request<Body>) -> Box<Future<Item=Response<Body>, Error=hyper::Error> + Send> { fn call(&self, req: Request<Body>) -> Box<Future<Item=Response<Body>, Error=hyper::Error> + Send> {
let tx1 = self.tx.clone(); let tx1 = self.tx.clone();
let tx2 = self.tx.clone(); let tx2 = self.tx.clone();
let replies = self.reply.clone(); let replies = self.reply.clone();
Box::new(req.into_body().for_each(move |chunk| { Box::new(req.into_body().for_each(move |chunk| {
tx1.lock().unwrap().send(Msg::Chunk(chunk.to_vec())).unwrap(); tx1.send(Msg::Chunk(chunk.to_vec())).unwrap();
Ok(()) Ok(())
}).then(move |result| { }).then(move |result| {
let msg = match result { let msg = match result {
Ok(()) => Msg::End, Ok(()) => Msg::End,
Err(e) => Msg::Error(e), Err(e) => Msg::Error(e),
}; };
tx2.lock().unwrap().send(msg).unwrap(); tx2.send(msg).unwrap();
Ok(()) Ok(())
}).map(move |_| { }).map(move |_| {
let mut res = Response::new(Body::empty()); TestService::build_reply(replies)
while let Ok(reply) = replies.try_recv() {
match reply {
Reply::Status(s) => {
*res.status_mut() = s;
},
Reply::Version(v) => {
*res.version_mut() = v;
},
Reply::Header(name, value) => {
res.headers_mut().insert(name, value);
},
Reply::Body(body) => {
*res.body_mut() = body;
},
Reply::End => break,
}
}
res
})) }))
} }
fn build_reply(replies: spmc::Receiver<Reply>) -> Response<Body> {
let mut res = Response::new(Body::empty());
while let Ok(reply) = replies.try_recv() {
match reply {
Reply::Status(s) => {
*res.status_mut() = s;
},
Reply::Version(v) => {
*res.version_mut() = v;
},
Reply::Header(name, value) => {
res.headers_mut().insert(name, value);
},
Reply::Body(body) => {
*res.body_mut() = body;
},
Reply::End => break,
}
}
res
}
} }
const HELLO: &'static str = "hello"; const HELLO: &'static str = "hello";
@@ -1806,7 +1807,7 @@ impl ServeOptions {
.http1_pipeline_flush(options.pipeline) .http1_pipeline_flush(options.pipeline)
.serve(move || { .serve(move || {
let ts = TestService { let ts = TestService {
tx: Arc::new(Mutex::new(msg_tx.clone())), tx: msg_tx.clone(),
reply: reply_rx.clone(), reply: reply_rx.clone(),
}; };
service_fn(move |req| ts.call(req)) service_fn(move |req| ts.call(req))