//! A collection of traits abstracting over Listeners and Streams. use std::any::{Any, TypeId}; use std::fmt; use std::old_io::{IoResult, IoError, ConnectionAborted, InvalidInput, OtherIoError, Stream, Listener, Acceptor}; use std::old_io::net::ip::{SocketAddr, ToSocketAddr, Port}; use std::old_io::net::tcp::{TcpStream, TcpListener, TcpAcceptor}; use std::mem; use std::raw::{self, TraitObject}; use std::sync::Arc; use uany::UnsafeAnyExt; use openssl::ssl::{Ssl, SslStream, SslContext}; use openssl::ssl::SslVerifyMode::SslVerifyNone; use openssl::ssl::SslMethod::Sslv23; use openssl::ssl::error::{SslError, StreamError, OpenSslErrors, SslSessionClosed}; use openssl::x509::X509FileType; macro_rules! try_some { ($expr:expr) => (match $expr { Some(val) => { return Err(val); }, _ => {} }) } /// The write-status indicating headers have not been written. #[allow(missing_copy_implementations)] pub struct Fresh; /// The write-status indicating headers have been written. #[allow(missing_copy_implementations)] pub struct Streaming; /// An abstraction to listen for connections on a certain port. pub trait NetworkListener { /// Type of Acceptor type Acceptor: NetworkAcceptor; /// Listens on a socket. fn listen(&mut self, addr: To) -> IoResult; } /// An abstraction to receive `NetworkStream`s. pub trait NetworkAcceptor: Clone + Send { /// Type of Stream to receive type Stream: NetworkStream + Send + Clone; /// Returns an iterator of streams. fn accept(&mut self) -> IoResult; /// Get the address this Listener ended up listening on. fn socket_name(&self) -> IoResult; /// Closes the Acceptor, so no more incoming connections will be handled. fn close(&mut self) -> IoResult<()>; /// Returns an iterator over incoming connections. fn incoming(&mut self) -> NetworkConnections { NetworkConnections(self) } } /// An iterator wrapper over a NetworkAcceptor. pub struct NetworkConnections<'a, N: NetworkAcceptor + 'a>(&'a mut N); impl<'a, N: NetworkAcceptor> Iterator for NetworkConnections<'a, N> { type Item = IoResult; fn next(&mut self) -> Option> { Some(self.0.accept()) } } /// An abstraction over streams that a Server can utilize. pub trait NetworkStream: Stream + Any + StreamClone + Send { /// Get the remote address of the underlying connection. fn peer_name(&mut self) -> IoResult; } #[doc(hidden)] pub trait StreamClone { fn clone_box(&self) -> Box; } impl StreamClone for T { #[inline] fn clone_box(&self) -> Box { Box::new(self.clone()) } } /// A connector creates a NetworkStream. pub trait NetworkConnector { /// Type of Stream to create type Stream: NetworkStream + Send; /// Connect to a remote address. fn connect(&mut self, host: &str, port: Port, scheme: &str) -> IoResult; } impl fmt::Debug for Box { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.pad("Box") } } impl Clone for Box { #[inline] fn clone(&self) -> Box { self.clone_box() } } impl Reader for Box { #[inline] fn read(&mut self, buf: &mut [u8]) -> IoResult { (**self).read(buf) } } impl Writer for Box { #[inline] fn write_all(&mut self, msg: &[u8]) -> IoResult<()> { (**self).write_all(msg) } #[inline] fn flush(&mut self) -> IoResult<()> { (**self).flush() } } impl<'a> Reader for &'a mut NetworkStream { #[inline] fn read(&mut self, buf: &mut [u8]) -> IoResult { (**self).read(buf) } } impl<'a> Writer for &'a mut NetworkStream { #[inline] fn write_all(&mut self, msg: &[u8]) -> IoResult<()> { (**self).write_all(msg) } #[inline] fn flush(&mut self) -> IoResult<()> { (**self).flush() } } impl UnsafeAnyExt for NetworkStream { unsafe fn downcast_ref_unchecked(&self) -> &T { mem::transmute(mem::transmute::<&NetworkStream, raw::TraitObject>(self).data) } unsafe fn downcast_mut_unchecked(&mut self) -> &mut T { mem::transmute(mem::transmute::<&mut NetworkStream, raw::TraitObject>(self).data) } unsafe fn downcast_unchecked(self: Box) -> Box { mem::transmute(mem::transmute::, raw::TraitObject>(self).data) } } impl NetworkStream { /// Is the underlying type in this trait object a T? #[inline] pub fn is(&self) -> bool { self.get_type_id() == TypeId::of::() } /// If the underlying type is T, get a reference to the contained data. #[inline] pub fn downcast_ref(&self) -> Option<&T> { if self.is::() { Some(unsafe { self.downcast_ref_unchecked() }) } else { None } } /// If the underlying type is T, get a mutable reference to the contained /// data. #[inline] pub fn downcast_mut(&mut self) -> Option<&mut T> { if self.is::() { Some(unsafe { self.downcast_mut_unchecked() }) } else { None } } /// If the underlying type is T, extract it. pub fn downcast(self: Box) -> Result, Box> { if self.is::() { Ok(unsafe { self.downcast_unchecked() }) } else { Err(self) } } } /// A `NetworkListener` for `HttpStream`s. #[allow(missing_copy_implementations)] pub enum HttpListener { /// Http variant. Http, /// Https variant. The two paths point to the certificate and key PEM files, in that order. Https(Path, Path), } impl NetworkListener for HttpListener { type Acceptor = HttpAcceptor; #[inline] fn listen(&mut self, addr: To) -> IoResult { let mut tcp = try!(TcpListener::bind(addr)); let addr = try!(tcp.socket_name()); Ok(match *self { HttpListener::Http => HttpAcceptor::Http(try!(tcp.listen()), addr), HttpListener::Https(ref cert, ref key) => { let mut ssl_context = try!(SslContext::new(Sslv23).map_err(lift_ssl_error)); try_some!(ssl_context.set_cipher_list("DEFAULT").map(lift_ssl_error)); try_some!(ssl_context.set_certificate_file( cert, X509FileType::PEM).map(lift_ssl_error)); try_some!(ssl_context.set_private_key_file( key, X509FileType::PEM).map(lift_ssl_error)); ssl_context.set_verify(SslVerifyNone, None); HttpAcceptor::Https(try!(tcp.listen()), addr, Arc::new(ssl_context)) } }) } } /// A `NetworkAcceptor` for `HttpStream`s. #[derive(Clone)] pub enum HttpAcceptor { /// Http variant. Http(TcpAcceptor, SocketAddr), /// Https variant. Https(TcpAcceptor, SocketAddr, Arc), } impl NetworkAcceptor for HttpAcceptor { type Stream = HttpStream; #[inline] fn accept(&mut self) -> IoResult { Ok(match *self { HttpAcceptor::Http(ref mut tcp, _) => HttpStream::Http(try!(tcp.accept())), HttpAcceptor::Https(ref mut tcp, _, ref ssl_context) => { let stream = try!(tcp.accept()); match SslStream::::new_server(&**ssl_context, stream) { Ok(ssl_stream) => HttpStream::Https(ssl_stream), Err(StreamError(ref e)) => { return Err(IoError { kind: ConnectionAborted, desc: "SSL Handshake Interrupted", detail: Some(e.desc.to_string()) }); }, Err(e) => return Err(lift_ssl_error(e)) } } }) } #[inline] fn close(&mut self) -> IoResult<()> { match *self { HttpAcceptor::Http(ref mut tcp, _) => tcp.close_accept(), HttpAcceptor::Https(ref mut tcp, _, _) => tcp.close_accept(), } } #[inline] fn socket_name(&self) -> IoResult { match *self { HttpAcceptor::Http(_, addr) => Ok(addr), HttpAcceptor::Https(_, addr, _) => Ok(addr), } } } /// A wrapper around a TcpStream. #[derive(Clone)] pub enum HttpStream { /// A stream over the HTTP protocol. Http(TcpStream), /// A stream over the HTTP protocol, protected by SSL. Https(SslStream), } impl Reader for HttpStream { #[inline] fn read(&mut self, buf: &mut [u8]) -> IoResult { match *self { HttpStream::Http(ref mut inner) => inner.read(buf), HttpStream::Https(ref mut inner) => inner.read(buf) } } } impl Writer for HttpStream { #[inline] fn write_all(&mut self, msg: &[u8]) -> IoResult<()> { match *self { HttpStream::Http(ref mut inner) => inner.write_all(msg), HttpStream::Https(ref mut inner) => inner.write_all(msg) } } #[inline] fn flush(&mut self) -> IoResult<()> { match *self { HttpStream::Http(ref mut inner) => inner.flush(), HttpStream::Https(ref mut inner) => inner.flush(), } } } impl NetworkStream for HttpStream { fn peer_name(&mut self) -> IoResult { match *self { HttpStream::Http(ref mut inner) => inner.peer_name(), HttpStream::Https(ref mut inner) => inner.get_mut().peer_name() } } } /// A connector that will produce HttpStreams. #[allow(missing_copy_implementations)] pub struct HttpConnector<'v>(pub Option>); /// A method that can set verification methods on an SSL context pub type ContextVerifier<'v> = Box ()+'v>; impl<'v> NetworkConnector for HttpConnector<'v> { type Stream = HttpStream; fn connect(&mut self, host: &str, port: Port, scheme: &str) -> IoResult { let addr = (host, port); match scheme { "http" => { debug!("http scheme"); Ok(HttpStream::Http(try!(TcpStream::connect(addr)))) }, "https" => { debug!("https scheme"); let stream = try!(TcpStream::connect(addr)); let mut context = try!(SslContext::new(Sslv23).map_err(lift_ssl_error)); if let Some(ref mut verifier) = self.0 { verifier(&mut context); } let ssl = try!(Ssl::new(&context).map_err(lift_ssl_error)); try!(ssl.set_hostname(host).map_err(lift_ssl_error)); let stream = try!(SslStream::new(&context, stream).map_err(lift_ssl_error)); Ok(HttpStream::Https(stream)) }, _ => { Err(IoError { kind: InvalidInput, desc: "Invalid scheme for Http", detail: None }) } } } } fn lift_ssl_error(ssl: SslError) -> IoError { debug!("lift_ssl_error: {:?}", ssl); match ssl { StreamError(err) => err, SslSessionClosed => IoError { kind: ConnectionAborted, desc: "SSL Connection Closed", detail: None }, // Unfortunately throw this away. No way to support this // detail without a better Error abstraction. OpenSslErrors(errs) => IoError { kind: OtherIoError, desc: "Error in OpenSSL", detail: Some(format!("{:?}", errs)) } } } #[cfg(test)] mod tests { use uany::UnsafeAnyExt; use mock::MockStream; use super::NetworkStream; #[test] fn test_downcast_box_stream() { let stream = box MockStream::new() as Box; let mock = stream.downcast::().ok().unwrap(); assert_eq!(mock, box MockStream::new()); } #[test] fn test_downcast_unchecked_box_stream() { let stream = box MockStream::new() as Box; let mock = unsafe { stream.downcast_unchecked::() }; assert_eq!(mock, box MockStream::new()); } }