From 15e59c1c6da550df26822ebb0b308196d251c0bf Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Sun, 16 Oct 2016 10:26:04 -0700 Subject: [PATCH] use current hyper for now --- Cargo.toml | 6 +- examples/simple.rs | 4 +- src/client.rs | 22 ++-- src/lib.rs | 7 +- src/sync.rs | 255 --------------------------------------------- 5 files changed, 23 insertions(+), 271 deletions(-) delete mode 100644 src/sync.rs diff --git a/Cargo.toml b/Cargo.toml index a7bff3e..cff9122 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,11 +1,11 @@ [package] -name = "request" -version = "0.1.0" +name = "reqwest" +version = "0.0.0" authors = ["Sean McArthur "] license = "MIT/Apache-2.0" [dependencies] -hyper = {git = "https://github.com/hyperium/hyper"} +hyper = { version = "0.9" }#, default-features = false } log = "0.3" [dev-dependencies] diff --git a/examples/simple.rs b/examples/simple.rs index 55a852e..5ad1601 100644 --- a/examples/simple.rs +++ b/examples/simple.rs @@ -1,4 +1,4 @@ -extern crate request; +extern crate reqwest; extern crate env_logger; fn main() { @@ -6,7 +6,7 @@ fn main() { println!("GET https://www.rust-lang.org"); - let mut res = request::get("https://www.rust-lang.org").unwrap(); + let mut res = reqwest::get("http://www.rust-lang.org").unwrap(); println!("Status: {}", res.status()); println!("Headers:\n{}", res.headers()); diff --git a/src/client.rs b/src/client.rs index 949740b..999cd7d 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,19 +1,21 @@ use std::io::{self, Read}; -use hyper::{Method, StatusCode, HttpVersion, Url}; use hyper::header::Headers; +use hyper::method::Method; +use hyper::status::StatusCode; +use hyper::version::HttpVersion; +use hyper::{Url}; use ::body::Body; -use ::sync; pub struct Client { - inner: sync::Client, + inner: ::hyper::Client, } impl Client { pub fn new() -> Client { Client { - inner: sync::Client::new(), + inner: ::hyper::Client::new(), } } @@ -48,7 +50,7 @@ pub struct RequestBuilder<'a> { impl<'a> RequestBuilder<'a> { - pub fn header(mut self, header: H) -> RequestBuilder<'a> { + pub fn header(mut self, header: H) -> RequestBuilder<'a> { self.headers.set(header); self } @@ -64,10 +66,12 @@ impl<'a> RequestBuilder<'a> { } pub fn send(mut self) -> ::Result { - self.headers.set(::header::ContentLength(0)); - let req = try!(self.client.inner.request(self.method, self.url, self.version, self.headers)); + let req = self.client.inner.request(self.method, self.url) + .headers(self.headers); - let res = try!(req.end()); + //TODO: body + + let res = try!(req.send()); Ok(Response { inner: res }) @@ -75,7 +79,7 @@ impl<'a> RequestBuilder<'a> { } pub struct Response { - inner: sync::Response, + inner: ::hyper::client::Response, } impl Response { diff --git a/src/lib.rs b/src/lib.rs index a7f3f6b..47333f0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,7 +5,11 @@ extern crate hyper; #[macro_use] extern crate log; -pub use hyper::{Method, StatusCode, header, Url}; +pub use hyper::header; +pub use hyper::method::Method; +pub use hyper::status::StatusCode; +pub use hyper::version::HttpVersion; +pub use hyper::Url; pub use self::client::{Client, Response}; pub use self::error::{Error, Result}; @@ -13,7 +17,6 @@ pub use self::error::{Error, Result}; mod body; mod client; mod error; -mod sync; pub fn get(url: &str) -> ::Result { let client = Client::new(); diff --git a/src/sync.rs b/src/sync.rs deleted file mode 100644 index 3408ce3..0000000 --- a/src/sync.rs +++ /dev/null @@ -1,255 +0,0 @@ -use std::io::{self, Read, Write}; -use std::sync::mpsc; -use std::time::Duration; - -use hyper::{self, Control, Next, Method, StatusCode, HttpVersion, RequestUri, Url}; -use hyper::header::Headers; - -pub struct Client { - inner: hyper::Client, -} - -impl Client { - pub fn new() -> Client { - Client { - inner: hyper::Client::::configure() - .connect_timeout(Duration::from_secs(10)) - .build().unwrap(), - } - } - - pub fn request(&self, method: Method, url: Url, version: HttpVersion, headers: Headers) -> ::Result { - let (ctrl_tx, ctrl_rx) = mpsc::channel(); - let (res_tx, res_rx) = mpsc::channel(); - let (action_tx, rx) = mpsc::channel(); - let (tx, action_rx) = mpsc::channel(); - - let timeout = Duration::from_secs(10); - - self.inner.request(url, SynchronousHandler { - read_timeout: timeout, - write_timeout: timeout, - - ctrl_tx: ctrl_tx, - res_tx: res_tx, - tx: tx, - rx: rx, - reading: None, - writing: None, - request: Some((method, version, headers)), - }).ok().expect("client dropped early"); - - // connecting - let ctrl = try!(ctrl_rx.recv().expect("ctrl_rx dropped early")); - - Ok(Request { - res_rx: res_rx, - tx: action_tx, - rx: action_rx, - ctrl: ctrl, - }) - } -} - -pub struct Request { - res_rx: mpsc::Receiver<::hyper::Result>, - tx: mpsc::Sender, - rx: mpsc::Receiver>, - ctrl: hyper::Control, -} - -impl Request { - pub fn end(self) -> ::Result { - trace!("Request.end"); - self.ctrl.ready(Next::read()).unwrap(); - let res = try!(self.res_rx.recv().expect("res_tx dropped early")); - Ok(Response { - status: res.status().clone(), - headers: res.headers().clone(), - version: res.version().clone(), - - tx: self.tx, - rx: self.rx, - ctrl: self.ctrl, - }) - } -} - -impl Write for Request { - fn write(&mut self, msg: &[u8]) -> io::Result { - self.tx.send(Action::Write(msg.as_ptr(), msg.len())).unwrap(); - self.ctrl.ready(Next::write()).unwrap(); - let res = self.rx.recv().unwrap(); - res - } - - fn flush(&mut self) -> io::Result<()> { - panic!("Request.flush() not implemented") - } -} - -pub struct Response { - pub headers: Headers, - pub status: StatusCode, - pub version: HttpVersion, - - tx: mpsc::Sender, - rx: mpsc::Receiver>, - ctrl: hyper::Control, - -} - -impl Read for Response { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - self.tx.send(Action::Read(buf.as_mut_ptr(), buf.len())).unwrap(); - self.ctrl.ready(Next::read()).unwrap(); - self.rx.recv().unwrap() - } -} - -struct SynchronousHandler { - read_timeout: Duration, - write_timeout: Duration, - - ctrl_tx: mpsc::Sender<::hyper::Result>, - res_tx: mpsc::Sender<::hyper::Result>, - tx: mpsc::Sender>, - rx: mpsc::Receiver, - reading: Option<(*mut u8, usize)>, - writing: Option<(*const u8, usize)>, - request: Option<(hyper::Method, hyper::HttpVersion, hyper::Headers)> -} - -unsafe impl Send for SynchronousHandler {} - -impl SynchronousHandler { - fn next(&mut self) -> Next { - match self.rx.try_recv() { - Ok(Action::Read(ptr, len)) => { - self.reading = Some((ptr, len)); - Next::read().timeout(self.read_timeout) - }, - Ok(Action::Write(ptr, len)) => { - self.writing = Some((ptr, len)); - Next::write().timeout(self.write_timeout) - } - Err(mpsc::TryRecvError::Empty) => { - // we're too fast, the other thread hasn't had a chance to respond - Next::wait() - } - Err(mpsc::TryRecvError::Disconnected) => { - // they dropped it - Next::end() - } - } - } - - fn reading(&mut self) -> Option<(*mut u8, usize)> { - self.reading.take().or_else(|| { - match self.rx.try_recv() { - Ok(Action::Read(ptr, len)) => { - Some((ptr, len)) - }, - _ => None - } - }) - } - - fn writing(&mut self) -> Option<(*const u8, usize)> { - self.writing.take().or_else(|| { - match self.rx.try_recv() { - Ok(Action::Write(ptr, len)) => { - Some((ptr, len)) - }, - _ => None - } - }) - } -} - -impl hyper::client::Handler for SynchronousHandler { - fn on_request(&mut self, req: &mut hyper::client::Request) -> Next { - use std::iter::Extend; - let head = self.request.take().unwrap(); - trace!("on_request {:?}", head); - req.set_method(head.0); - //req.set_uri(head.1); - req.headers_mut().extend(head.2.iter()); - self.next() - - } - - fn on_request_writable(&mut self, encoder: &mut hyper::Encoder) -> Next { - trace!("on_request_writable"); - if let Some(raw) = self.writing() { - let slice = unsafe { ::std::slice::from_raw_parts(raw.0, raw.1) }; - if self.tx.send(encoder.write(slice)).is_err() { - return Next::end(); - } - } - self.next() - } - - fn on_response(&mut self, res: hyper::client::Response) -> Next { - trace!("on_response {:?}", res); - if let Err(_) = self.res_tx.send(Ok(res)) { - return Next::end(); - } - self.next() - } - - fn on_response_readable(&mut self, decoder: &mut hyper::Decoder) -> Next { - trace!("on_response_readable"); - if let Some(raw) = self.reading() { - let slice = unsafe { ::std::slice::from_raw_parts_mut(raw.0, raw.1) }; - if self.tx.send(decoder.read(slice)).is_err() { - return Next::end(); - } - } - self.next() - } - - fn on_error(&mut self, err: ::hyper::Error) -> Next { - debug!("on_error {:?}", err); - let _ = self.ctrl_tx.send(Err(err)); - Next::remove() - } - - fn on_control(&mut self, ctrl: Control) { - let _ = self.ctrl_tx.send(Ok(ctrl)); - } -} - -enum Action { - Read(*mut u8, usize), - Write(*const u8, usize), -} - -unsafe impl Send for Action {} - - -#[cfg(test)] -mod tests { - use std::io::{Read, Write}; - use std::net::TcpListener; - use std::thread; - - #[test] - fn test_get() { - extern crate env_logger; - env_logger::init().unwrap(); - let server = TcpListener::bind("127.0.0.1:0").unwrap(); - let addr = server.local_addr().unwrap(); - thread::spawn(move || { - let mut inc = server.accept().unwrap().0; - let mut buf = [0; 4096]; - inc.read(&mut buf).unwrap(); - }); - - let mut res = super::super::get(&format!("http://{}", addr)).unwrap(); - assert_eq!(res.status(), &::hyper::Ok); - - let mut buf = Vec::new(); - res.read_to_end(&mut buf).unwrap(); - } -}