pavex/server/server.rs
1use std::future::Future;
2use std::net::SocketAddr;
3
4use crate::connection::ConnectionInfo;
5use crate::server::configuration::ServerConfiguration;
6use crate::server::server_handle::ServerHandle;
7
8use super::IncomingStream;
9
10/// An HTTP server to handle incoming connections for Pavex applications.
11/// It handles both HTTP1 and HTTP2 connections.
12///
13/// # Example
14///
15/// ```rust
16/// use std::net::SocketAddr;
17/// use pavex::server::Server;
18///
19/// # #[derive(Clone)] struct ApplicationState;
20/// # async fn router(_req: hyper::Request<hyper::body::Incoming>, _conn_info: Option<pavex::connection::ConnectionInfo>, _state: ApplicationState) -> pavex::Response { todo!() }
21/// # async fn t() -> std::io::Result<()> {
22/// # let application_state = ApplicationState;
23/// let addr = SocketAddr::from(([127, 0, 0, 1], 8080));
24///
25/// Server::new()
26/// .bind(addr)
27/// .await?
28/// // Both the routing function and the application state will usually
29/// // be code-generated by Pavex, starting from your `Blueprint`.
30/// // You don't have to define them manually!
31/// .serve(router, application_state)
32/// // The `serve` method returns a `ServerHandle` that you can use to
33/// // interact with the server.
34/// // Calling `.await` on the handle lets you wait until the server
35/// // shuts down.
36/// .await;
37/// # Ok(())
38/// # }
39/// ```
40///
41/// # Configuration
42///
43/// [`Server::new`] returns a new [`Server`] with default configuration.
44/// You can customize the server default settings by creating your own [`ServerConfiguration`]
45/// and invoking [`Server::set_config`].
46///
47/// # Architecture
48///
49/// By default, [`Server::serve`] creates a worker per CPU core and distributes connection from an
50/// acceptor thread using a round-robin strategy.
51///
52/// Each worker has its own single-threaded [`tokio`] runtime—there is no work stealing across
53/// workers.
54/// Each worker takes care to invoke your routing and request handling logic, with the help
55/// of [`hyper`].
56#[must_use = "You must call `serve` on a `Server` to start listening for incoming connections"]
57pub struct Server {
58 config: ServerConfiguration,
59 incoming: Vec<IncomingStream>,
60}
61
62impl Default for Server {
63 fn default() -> Self {
64 Self::new()
65 }
66}
67
68impl Server {
69 /// Create a new [`Server`] with default configuration.
70 pub fn new() -> Self {
71 Self {
72 config: ServerConfiguration::default(),
73 incoming: Vec::new(),
74 }
75 }
76
77 /// Configure this [`Server`] according to the values set in the [`ServerConfiguration`]
78 /// passed as input parameter.
79 /// It will overwrite any previous configuration set on this [`Server`].
80 ///
81 /// If you want to retrieve the current configuration, use [`Server::get_config`].
82 pub fn set_config(mut self, config: ServerConfiguration) -> Self {
83 self.config = config;
84 self
85 }
86
87 /// Get a reference to the [`ServerConfiguration`] for this [`Server`].
88 ///
89 /// If you want to overwrite the existing configuration, use [`Server::set_config`].
90 pub fn get_config(&self) -> &ServerConfiguration {
91 &self.config
92 }
93
94 /// Bind the server to the given address: the server will accept incoming connections from this
95 /// address when started.
96 /// Binding an address may fail (e.g. if the address is already in use), therefore this method
97 /// may return an error.
98 ///
99 /// # Related
100 ///
101 /// Check out [`Server::listen`] for an alternative binding mechanism as well as a
102 /// discussion of the pros and cons of [`Server::bind`] vs [`Server::listen`].
103 ///
104 /// # Note
105 ///
106 /// A [`Server`] can be bound to multiple addresses: just call this method multiple times with
107 /// all the addresses you want to bind to.
108 ///
109 /// # Example: bind one address
110 ///
111 /// ```rust
112 /// use std::net::SocketAddr;
113 /// use pavex::server::Server;
114 ///
115 /// # async fn t() -> std::io::Result<()> {
116 /// let addr = SocketAddr::from(([127, 0, 0, 1], 8080));
117 ///
118 /// Server::new()
119 /// .bind(addr)
120 /// .await?
121 /// # ;
122 /// // [...]
123 /// # Ok(())
124 /// # }
125 /// ```
126 ///
127 /// # Example: bind multiple addresses
128 ///
129 /// ```rust
130 /// use std::net::SocketAddr;
131 /// use pavex::server::Server;
132 ///
133 /// # async fn t() -> std::io::Result<()> {
134 /// let addr1 = SocketAddr::from(([127, 0, 0, 1], 8080));
135 /// let addr2 = SocketAddr::from(([127, 0, 0, 1], 4000));
136 ///
137 /// Server::new()
138 /// .bind(addr1)
139 /// .await?
140 /// .bind(addr2)
141 /// .await?
142 /// # ;
143 /// // [...]
144 /// # Ok(())
145 /// # }
146 /// ````
147 pub async fn bind(mut self, addr: SocketAddr) -> std::io::Result<Self> {
148 let incoming = IncomingStream::bind(addr).await?;
149 self.incoming.push(incoming);
150 Ok(self)
151 }
152
153 /// Ask the server to process incoming connections from the provided [`IncomingStream`].
154 ///
155 /// # [`Server::listen`] vs [`Server::bind`]
156 ///
157 /// [`Server::bind`] only requires you to specify the address you want to listen at. The
158 /// socket configuration is handled by the [`Server`], with a set of reasonable default
159 /// parameters. You have no access to the [`IncomingStream`] that gets bound to the address
160 /// you specified.
161 ///
162 /// [`Server::listen`], instead, expects an [`IncomingStream`].
163 /// You are free to configure the socket as you see please and the [`Server`] will just
164 /// poll it for incoming connections.
165 /// It also allows you to interact with the bound [`IncomingStream`] directly
166 ///
167 /// # Example: bind to a random port
168 ///
169 /// ```rust
170 /// use std::net::SocketAddr;
171 /// use pavex::server::{IncomingStream, Server};
172 ///
173 /// # async fn t() -> std::io::Result<()> {
174 /// // `0` is a special port: it tells the OS to assign us
175 /// // a random **unused** port
176 /// let addr = SocketAddr::from(([127, 0, 0, 1], 0));
177 /// let incoming = IncomingStream::bind(addr).await?;
178 /// // We can then retrieve the actual port we were assigned
179 /// // by the OS.
180 /// let addr = incoming.local_addr()?.to_owned();
181 ///
182 /// Server::new()
183 /// .listen(incoming);
184 /// # ;
185 /// // [...]
186 /// # Ok(())
187 /// # }
188 /// ````
189 ///
190 /// # Example: set a custom socket backlog
191 ///
192 /// ```rust
193 /// use std::net::SocketAddr;
194 /// use socket2::Domain;
195 /// use pavex::server::{IncomingStream, Server};
196 ///
197 /// # async fn t() -> std::io::Result<()> {
198 /// // `0` is a special port: it tells the OS to assign us
199 /// // a random **unused** port
200 /// let addr = SocketAddr::from(([127, 0, 0, 1], 0));
201 ///
202 /// let socket = socket2::Socket::new(
203 /// Domain::for_address(addr),
204 /// socket2::Type::STREAM,
205 /// Some(socket2::Protocol::TCP),
206 /// )
207 /// .expect("Failed to create a socket");
208 /// socket.set_reuse_address(true)?;
209 /// socket.set_nonblocking(true)?;
210 /// socket.bind(&addr.into())?;
211 /// // The custom backlog!
212 /// socket.listen(2048_i32)?;
213 ///
214 /// let listener = std::net::TcpListener::from(socket);
215 /// Server::new()
216 /// .listen(listener.try_into()?)
217 /// # ;
218 /// // [...]
219 /// # Ok(())
220 /// # }
221 /// ````
222 ///
223 /// # Note
224 ///
225 /// A [`Server`] can listen to multiple streams of incoming connections: just call this method
226 /// multiple times!
227 pub fn listen(mut self, incoming: IncomingStream) -> Self {
228 self.incoming.push(incoming);
229 self
230 }
231
232 /// Start listening for incoming connections.
233 ///
234 /// You must specify:
235 ///
236 /// - a handler function, which will be called for each incoming request;
237 /// - the application state, the set of singleton components that will be available to
238 /// your handler function.
239 ///
240 /// Both the handler function and the application state are usually code-generated by Pavex
241 /// starting from your [`Blueprint`](crate::Blueprint).
242 ///
243 /// # Wait for the server to shut down
244 ///
245 /// `serve` returns a [`ServerHandle`].
246 /// Calling `.await` on the handle lets you wait until the server shuts down.
247 ///
248 /// # Panics
249 ///
250 /// This method will panic if the [`Server`] has no registered sources of incoming connections,
251 /// i.e. if you did not call [`Server::bind`] or [`Server::listen`] before calling `serve`.
252 ///
253 /// If you'd rather handle the error, use [`Server::try_serve`] instead.
254 pub fn serve<HandlerFuture, ApplicationState>(
255 self,
256 handler: fn(
257 http::Request<hyper::body::Incoming>,
258 Option<ConnectionInfo>,
259 ApplicationState,
260 ) -> HandlerFuture,
261 application_state: ApplicationState,
262 ) -> ServerHandle
263 where
264 HandlerFuture: Future<Output = crate::Response> + 'static,
265 ApplicationState: Clone + Send + Sync + 'static,
266 {
267 self.try_serve(handler, application_state).unwrap()
268 }
269
270 /// A fallible version of [`Server::serve`].
271 ///
272 /// It will return an error, rather than panicking, if the [`Server`] has no registered sources
273 /// of incoming connections, i.e. if you did not call [`Server::bind`] or [`Server::listen`]
274 /// before calling `serve`.
275 pub fn try_serve<HandlerFuture, ApplicationState>(
276 self,
277 handler: fn(
278 http::Request<hyper::body::Incoming>,
279 Option<ConnectionInfo>,
280 ApplicationState,
281 ) -> HandlerFuture,
282 application_state: ApplicationState,
283 ) -> Result<ServerHandle, std::io::Error>
284 where
285 HandlerFuture: Future<Output = crate::Response> + 'static,
286 ApplicationState: Clone + Send + Sync + 'static,
287 {
288 if self.incoming.is_empty() {
289 let err_msg = "Cannot serve: there is no source of incoming connections. You must call `bind` or `listen` on the `Server` instance before invoking `serve`.";
290 return Err(std::io::Error::other(err_msg));
291 }
292 Ok(ServerHandle::new(
293 self.config,
294 self.incoming,
295 handler,
296 application_state,
297 ))
298 }
299}