1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
use error::ErrorKind;
use util::BufStream;
use util::http::{HttpService, NewHttpService};
use futures::Poll;
use http;
use hyper;
use hyper::body::{Body, Chunk, Payload};
use hyper::server::conn::Http;
use hyper::service::Service as HyperService;
use util::buf_stream::size_hint::{Builder, SizeHint};
use tokio;
use tokio::net::TcpListener;
use tokio::prelude::*;
use std::io;
use std::net::SocketAddr;
use std::sync::Arc;
struct Lift<T: HttpService> {
inner: T,
}
struct LiftBody<T: HttpService> {
body: T::ResponseBody,
}
#[derive(Debug)]
pub struct LiftReqBody {
body: Body,
}
impl<T> Lift<T>
where
T: HttpService<RequestBody = LiftReqBody>,
{
fn new(inner: T) -> Self {
Lift { inner }
}
}
impl<T> Payload for LiftBody<T>
where
T: HttpService + 'static,
<T::ResponseBody as BufStream>::Item: Send,
T::ResponseBody: Send,
{
type Data = <T::ResponseBody as BufStream>::Item;
type Error = ::Error;
fn poll_data(&mut self) -> Poll<Option<Self::Data>, Self::Error> {
self.body.poll()
.map_err(|_| unimplemented!())
}
}
impl BufStream for LiftReqBody {
type Item = Chunk;
type Error = ::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, ::Error> {
Stream::poll(&mut self.body).map_err(|_| ErrorKind::internal().into())
}
}
impl<T> HyperService for Lift<T>
where
T: HttpService<RequestBody = LiftReqBody> + 'static,
<T::ResponseBody as BufStream>::Item: Send,
T::ResponseBody: Send,
T::Future: Send,
{
type ReqBody = Body;
type ResBody = LiftBody<T>;
type Error = ::Error;
type Future = Box<Future<Item = http::Response<Self::ResBody>, Error = Self::Error> + Send>;
fn call(&mut self, request: http::Request<Self::ReqBody>) -> Self::Future {
let request = request.map(|body| LiftReqBody { body });
let response = self.inner
.call_http(request)
.map(|response| response.map(|body| LiftBody { body }))
.map_err(|_| unimplemented!())
;
Box::new(response)
}
}
pub fn run<T>(addr: &SocketAddr, new_service: T) -> io::Result<()>
where
T: NewHttpService<RequestBody = LiftReqBody> + Send + 'static,
T::Future: Send,
<T::ResponseBody as BufStream>::Item: Send,
T::ResponseBody: Send,
T::Service: Send,
<T::Service as HttpService>::Future: Send,
{
let listener = TcpListener::bind(addr)?;
let http = Arc::new(Http::new());
tokio::run({
listener
.incoming()
.map_err(|e| println!("failed to accept socket; err = {:?}", e))
.for_each(move |socket| {
let h = http.clone();
tokio::spawn({
new_service.new_http_service()
.map_err(|_| unimplemented!())
.and_then(move |service| {
let service = Lift::new(service);
h.serve_connection(socket, service)
.map(|_| ())
.map_err(|e| {
println!("failed to serve connection; err={:?}", e);
})
})
})
})
});
Ok(())
}
impl BufStream for Body {
type Item = Chunk;
type Error = hyper::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
Stream::poll(self)
}
fn size_hint(&self) -> SizeHint {
let mut builder = Builder::new();
if let Some(length) = self.content_length() {
if length < usize::max_value() as u64 {
let length = length as usize;
builder.lower(length).upper(length);
} else {
builder.lower(usize::max_value());
}
}
builder.build()
}
}