diff --git a/tests/server.rs b/tests/server.rs index f4ce1168..24b35471 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -14,7 +14,7 @@ use std::net::{TcpStream, Shutdown, SocketAddr}; use std::io::{self, Read, Write}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc}; use std::net::{TcpListener as StdTcpListener}; use std::thread; use std::time::Duration; @@ -701,7 +701,7 @@ fn header_connection_close() { } #[test] -fn expect_continue() { +fn expect_continue_sends_100() { let server = serve(); let mut req = connect(server.addr()); server.reply(); @@ -1655,7 +1655,7 @@ impl Drop for Serve { #[derive(Clone)] struct TestService { - tx: Arc>>, + tx: mpsc::Sender, reply: spmc::Receiver, } @@ -1670,51 +1670,52 @@ enum Reply { #[derive(Debug)] enum Msg { - //Head(Request), Chunk(Vec), Error(hyper::Error), End, } impl TestService { - // Box is needed until we can return `impl Future` from a fn fn call(&self, req: Request) -> Box, Error=hyper::Error> + Send> { let tx1 = self.tx.clone(); let tx2 = self.tx.clone(); - let replies = self.reply.clone(); Box::new(req.into_body().for_each(move |chunk| { - tx1.lock().unwrap().send(Msg::Chunk(chunk.to_vec())).unwrap(); + tx1.send(Msg::Chunk(chunk.to_vec())).unwrap(); Ok(()) }).then(move |result| { let msg = match result { Ok(()) => Msg::End, Err(e) => Msg::Error(e), }; - tx2.lock().unwrap().send(msg).unwrap(); + tx2.send(msg).unwrap(); Ok(()) }).map(move |_| { - let mut res = Response::new(Body::empty()); - while let Ok(reply) = replies.try_recv() { - match reply { - Reply::Status(s) => { - *res.status_mut() = s; - }, - Reply::Version(v) => { - *res.version_mut() = v; - }, - Reply::Header(name, value) => { - res.headers_mut().insert(name, value); - }, - Reply::Body(body) => { - *res.body_mut() = body; - }, - Reply::End => break, - } - } - res + TestService::build_reply(replies) })) } + + fn build_reply(replies: spmc::Receiver) -> Response { + let mut res = Response::new(Body::empty()); + while let Ok(reply) = replies.try_recv() { + match reply { + Reply::Status(s) => { + *res.status_mut() = s; + }, + Reply::Version(v) => { + *res.version_mut() = v; + }, + Reply::Header(name, value) => { + res.headers_mut().insert(name, value); + }, + Reply::Body(body) => { + *res.body_mut() = body; + }, + Reply::End => break, + } + } + res + } } const HELLO: &'static str = "hello"; @@ -1806,7 +1807,7 @@ impl ServeOptions { .http1_pipeline_flush(options.pipeline) .serve(move || { let ts = TestService { - tx: Arc::new(Mutex::new(msg_tx.clone())), + tx: msg_tx.clone(), reply: reply_rx.clone(), }; service_fn(move |req| ts.call(req))