From d78eff6918b4e639e17f5f3abde4d582b0de43da Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Sun, 16 Oct 2016 09:40:28 -0700 Subject: [PATCH] improvements --- examples/simple.rs | 4 +++- src/body.rs | 37 +++++++++++++++++++++++++++++++++++++ src/client.rs | 26 +++++++++++++++++++------- src/error.rs | 14 ++++++++++++++ src/lib.rs | 11 +++++++++-- src/sync.rs | 34 ++++++++++++++++++++++------------ 6 files changed, 104 insertions(+), 22 deletions(-) create mode 100644 src/body.rs create mode 100644 src/error.rs diff --git a/examples/simple.rs b/examples/simple.rs index 0bde65e..55a852e 100644 --- a/examples/simple.rs +++ b/examples/simple.rs @@ -4,7 +4,9 @@ extern crate env_logger; fn main() { env_logger::init().unwrap(); - let mut res = request::get("https://rust-lang.org").unwrap(); + println!("GET https://www.rust-lang.org"); + + let mut res = request::get("https://www.rust-lang.org").unwrap(); println!("Status: {}", res.status()); println!("Headers:\n{}", res.headers()); diff --git a/src/body.rs b/src/body.rs new file mode 100644 index 0000000..93255a9 --- /dev/null +++ b/src/body.rs @@ -0,0 +1,37 @@ +use std::io::Read; + +pub struct Body(Kind); + +impl Body { + pub fn sized(reader: (), len: u64) -> Body { + unimplemented!() + } + + pub fn chunked(reader: ()) -> Body { + unimplemented!() + } +} + +enum Kind { + Length, + Chunked +} + +impl From> for Body { + #[inline] + fn from(v: Vec) -> Body { + unimplemented!() + } +} + +impl From for Body { + #[inline] + fn from(s: String) -> Body { + s.into_bytes().into() + } +} + +/// Wraps a `std::io::Write`. +pub struct Pipe(Kind); + + diff --git a/src/client.rs b/src/client.rs index 7447a4e..949740b 100644 --- a/src/client.rs +++ b/src/client.rs @@ -3,7 +3,8 @@ use std::io::{self, Read}; use hyper::{Method, StatusCode, HttpVersion, Url}; use hyper::header::Headers; -use super::sync; +use ::body::Body; +use ::sync; pub struct Client { inner: sync::Client, @@ -42,20 +43,31 @@ pub struct RequestBuilder<'a> { version: HttpVersion, headers: Headers, - body: Option<()>, + body: Option, } impl<'a> RequestBuilder<'a> { - pub fn body(mut self, body: ()) -> RequestBuilder<'a> { - self.body = Some(body); + + pub fn header(mut self, header: H) -> RequestBuilder<'a> { + self.headers.set(header); self } - pub fn send(mut self) -> Result { - self.headers.set(::hyper::header::ContentLength(0)); + pub fn headers(mut self, headers: ::header::Headers) -> RequestBuilder<'a> { + self.headers.extend(headers.iter()); + self + } + + pub fn body>(mut self, body: T) -> RequestBuilder<'a> { + self.body = Some(body.into()); + self + } + + pub fn send(mut self) -> ::Result { + self.headers.set(::header::ContentLength(0)); let req = try!(self.client.inner.request(self.method, self.url, self.version, self.headers)); - let res = try!(req.end().map_err(|e| format!("RequestError: end: {}", e))); + let res = try!(req.end()); Ok(Response { inner: res }) diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..1463612 --- /dev/null +++ b/src/error.rs @@ -0,0 +1,14 @@ +#[derive(Debug)] +pub enum Error { + Http(::hyper::Error), + #[doc(hidden)] + __DontMatchMe, +} + +impl From<::hyper::Error> for Error { + fn from(err: ::hyper::Error) -> Error { + Error::Http(err) + } +} + +pub type Result = ::std::result::Result; diff --git a/src/lib.rs b/src/lib.rs index d426897..a7f3f6b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,14 +1,21 @@ +#![allow(warnings)] + + extern crate hyper; #[macro_use] extern crate log; pub use hyper::{Method, StatusCode, header, Url}; -pub use self::client::{Client, Response}; +pub use self::client::{Client, Response}; +pub use self::error::{Error, Result}; + +mod body; mod client; +mod error; mod sync; -pub fn get(url: &str) -> Result { +pub fn get(url: &str) -> ::Result { let client = Client::new(); client.get(url).send() } diff --git a/src/sync.rs b/src/sync.rs index 8188efa..3408ce3 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -18,7 +18,7 @@ impl Client { } } - pub fn request(&self, method: Method, url: Url, version: HttpVersion, headers: Headers) -> Result { + pub fn request(&self, method: Method, url: Url, version: HttpVersion, headers: Headers) -> ::Result { let (ctrl_tx, ctrl_rx) = mpsc::channel(); let (res_tx, res_rx) = mpsc::channel(); let (action_tx, rx) = mpsc::channel(); @@ -26,7 +26,7 @@ impl Client { let timeout = Duration::from_secs(10); - try!(self.inner.request(url, SynchronousHandler { + self.inner.request(url, SynchronousHandler { read_timeout: timeout, write_timeout: timeout, @@ -37,29 +37,32 @@ impl Client { reading: None, writing: None, request: Some((method, version, headers)), - }).map_err(|e| format!("RequestError: {}", e))); + }).ok().expect("client dropped early"); + + // connecting + let ctrl = try!(ctrl_rx.recv().expect("ctrl_rx dropped early")); Ok(Request { res_rx: res_rx, tx: action_tx, rx: action_rx, - ctrl: try!(ctrl_rx.recv().map_err(|e| format!("RequestError: waiting for Control: {}", e))), + ctrl: ctrl, }) } } pub struct Request { - res_rx: mpsc::Receiver, + res_rx: mpsc::Receiver<::hyper::Result>, tx: mpsc::Sender, rx: mpsc::Receiver>, ctrl: hyper::Control, } impl Request { - pub fn end(self) -> Result { + pub fn end(self) -> ::Result { trace!("Request.end"); self.ctrl.ready(Next::read()).unwrap(); - let res = try!(self.res_rx.recv().map_err(|e| format!("RequestError: end = {}", e))); + let res = try!(self.res_rx.recv().expect("res_tx dropped early")); Ok(Response { status: res.status().clone(), headers: res.headers().clone(), @@ -108,8 +111,8 @@ struct SynchronousHandler { read_timeout: Duration, write_timeout: Duration, - ctrl_tx: mpsc::Sender, - res_tx: mpsc::Sender, + ctrl_tx: mpsc::Sender<::hyper::Result>, + res_tx: mpsc::Sender<::hyper::Result>, tx: mpsc::Sender>, rx: mpsc::Receiver, reading: Option<(*mut u8, usize)>, @@ -189,7 +192,7 @@ impl hyper::client::Handler for SynchronousHand fn on_response(&mut self, res: hyper::client::Response) -> Next { trace!("on_response {:?}", res); - if let Err(_) = self.res_tx.send(res) { + if let Err(_) = self.res_tx.send(Ok(res)) { return Next::end(); } self.next() @@ -206,15 +209,20 @@ impl hyper::client::Handler for SynchronousHand self.next() } + fn on_error(&mut self, err: ::hyper::Error) -> Next { + debug!("on_error {:?}", err); + let _ = self.ctrl_tx.send(Err(err)); + Next::remove() + } + fn on_control(&mut self, ctrl: Control) { - self.ctrl_tx.send(ctrl).unwrap(); + let _ = self.ctrl_tx.send(Ok(ctrl)); } } enum Action { Read(*mut u8, usize), Write(*const u8, usize), - //Request(Method, RequestUri, HttpVersion, Headers), } unsafe impl Send for Action {} @@ -228,6 +236,8 @@ mod tests { #[test] fn test_get() { + extern crate env_logger; + env_logger::init().unwrap(); let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); thread::spawn(move || {