pavex/server/
server.rs

1use std::future::Future;
2use std::net::SocketAddr;
3
4use crate::connection::ConnectionInfo;
5use crate::server::configuration::ServerConfiguration;
6use crate::server::server_handle::ServerHandle;
7
8use super::IncomingStream;
9
10/// An HTTP server to handle incoming connections for Pavex applications.
11/// It handles both HTTP1 and HTTP2 connections.
12///
13/// # Example
14///
15/// ```rust
16/// use std::net::SocketAddr;
17/// use pavex::server::Server;
18///
19/// # #[derive(Clone)] struct ApplicationState;
20/// # async fn router(_req: hyper::Request<hyper::body::Incoming>, _conn_info: Option<pavex::connection::ConnectionInfo>, _state: ApplicationState) -> pavex::Response { todo!() }
21/// # async fn t() -> std::io::Result<()> {
22/// # let application_state = ApplicationState;
23/// let addr = SocketAddr::from(([127, 0, 0, 1], 8080));
24///
25/// Server::new()
26///     .bind(addr)
27///     .await?
28///     // Both the routing function and the application state will usually
29///     // be code-generated by Pavex, starting from your `Blueprint`.
30///     // You don't have to define them manually!
31///     .serve(router, application_state)
32///     // The `serve` method returns a `ServerHandle` that you can use to
33///     // interact with the server.
34///     // Calling `.await` on the handle lets you wait until the server
35///     // shuts down.
36///     .await;
37/// # Ok(())
38/// # }
39/// ```
40///
41/// # Configuration
42///
43/// [`Server::new`] returns a new [`Server`] with default configuration.
44/// You can customize the server default settings by creating your own [`ServerConfiguration`]
45/// and invoking [`Server::set_config`].
46///
47/// # Architecture
48///
49/// By default, [`Server::serve`] creates a worker per CPU core and distributes connection from an
50/// acceptor thread using a round-robin strategy.
51///
52/// Each worker has its own single-threaded [`tokio`] runtime—there is no work stealing across
53/// workers.
54/// Each worker takes care to invoke your routing and request handling logic, with the help
55/// of [`hyper`].
56#[must_use = "You must call `serve` on a `Server` to start listening for incoming connections"]
57pub struct Server {
58    config: ServerConfiguration,
59    incoming: Vec<IncomingStream>,
60}
61
62impl Default for Server {
63    fn default() -> Self {
64        Self::new()
65    }
66}
67
68impl Server {
69    /// Create a new [`Server`] with default configuration.
70    pub fn new() -> Self {
71        Self {
72            config: ServerConfiguration::default(),
73            incoming: Vec::new(),
74        }
75    }
76
77    /// Configure this [`Server`] according to the values set in the [`ServerConfiguration`]
78    /// passed as input parameter.
79    /// It will overwrite any previous configuration set on this [`Server`].
80    ///
81    /// If you want to retrieve the current configuration, use [`Server::get_config`].
82    pub fn set_config(mut self, config: ServerConfiguration) -> Self {
83        self.config = config;
84        self
85    }
86
87    /// Get a reference to the [`ServerConfiguration`] for this [`Server`].
88    ///
89    /// If you want to overwrite the existing configuration, use [`Server::set_config`].
90    pub fn get_config(&self) -> &ServerConfiguration {
91        &self.config
92    }
93
94    /// Bind the server to the given address: the server will accept incoming connections from this
95    /// address when started.
96    /// Binding an address may fail (e.g. if the address is already in use), therefore this method
97    /// may return an error.
98    ///
99    /// # Related
100    ///
101    /// Check out [`Server::listen`] for an alternative binding mechanism as well as a
102    /// discussion of the pros and cons of [`Server::bind`] vs [`Server::listen`].
103    ///
104    /// # Note
105    ///
106    /// A [`Server`] can be bound to multiple addresses: just call this method multiple times with
107    /// all the addresses you want to bind to.
108    ///
109    /// # Example: bind one address
110    ///
111    /// ```rust
112    /// use std::net::SocketAddr;
113    /// use pavex::server::Server;
114    ///
115    /// # async fn t() -> std::io::Result<()> {
116    /// let addr = SocketAddr::from(([127, 0, 0, 1], 8080));
117    ///
118    /// Server::new()
119    ///     .bind(addr)
120    ///     .await?
121    ///     # ;
122    ///     // [...]
123    /// # Ok(())
124    /// # }
125    /// ```
126    ///
127    /// # Example: bind multiple addresses
128    ///
129    /// ```rust
130    /// use std::net::SocketAddr;
131    /// use pavex::server::Server;
132    ///
133    /// # async fn t() -> std::io::Result<()> {
134    /// let addr1 = SocketAddr::from(([127, 0, 0, 1], 8080));
135    /// let addr2 = SocketAddr::from(([127, 0, 0, 1], 4000));
136    ///
137    /// Server::new()
138    ///     .bind(addr1)
139    ///     .await?
140    ///     .bind(addr2)
141    ///     .await?
142    ///     # ;
143    ///     // [...]
144    /// # Ok(())
145    /// # }
146    /// ````
147    pub async fn bind(mut self, addr: SocketAddr) -> std::io::Result<Self> {
148        let incoming = IncomingStream::bind(addr).await?;
149        self.incoming.push(incoming);
150        Ok(self)
151    }
152
153    /// Ask the server to process incoming connections from the provided [`IncomingStream`].
154    ///
155    /// # [`Server::listen`] vs [`Server::bind`]
156    ///
157    /// [`Server::bind`] only requires you to specify the address you want to listen at. The
158    /// socket configuration is handled by the [`Server`], with a set of reasonable default
159    /// parameters. You have no access to the [`IncomingStream`] that gets bound to the address
160    /// you specified.
161    ///
162    /// [`Server::listen`], instead, expects an [`IncomingStream`].
163    /// You are free to configure the socket as you see please and the [`Server`] will just
164    /// poll it for incoming connections.
165    /// It also allows you to interact with the bound [`IncomingStream`] directly
166    ///
167    /// # Example: bind to a random port
168    ///
169    /// ```rust
170    /// use std::net::SocketAddr;
171    /// use pavex::server::{IncomingStream, Server};
172    ///
173    /// # async fn t() -> std::io::Result<()> {
174    /// // `0` is a special port: it tells the OS to assign us
175    /// // a random **unused** port
176    /// let addr = SocketAddr::from(([127, 0, 0, 1], 0));
177    /// let incoming = IncomingStream::bind(addr).await?;
178    /// // We can then retrieve the actual port we were assigned
179    /// // by the OS.
180    /// let addr = incoming.local_addr()?.to_owned();
181    ///
182    /// Server::new()
183    ///     .listen(incoming);
184    ///     # ;
185    ///     // [...]
186    /// # Ok(())
187    /// # }
188    /// ````
189    ///
190    /// # Example: set a custom socket backlog
191    ///
192    /// ```rust
193    /// use std::net::SocketAddr;
194    /// use socket2::Domain;
195    /// use pavex::server::{IncomingStream, Server};
196    ///
197    /// # async fn t() -> std::io::Result<()> {
198    /// // `0` is a special port: it tells the OS to assign us
199    /// // a random **unused** port
200    /// let addr = SocketAddr::from(([127, 0, 0, 1], 0));
201    ///
202    /// let socket = socket2::Socket::new(
203    ///    Domain::for_address(addr),
204    ///    socket2::Type::STREAM,
205    ///    Some(socket2::Protocol::TCP),
206    /// )
207    /// .expect("Failed to create a socket");
208    /// socket.set_reuse_address(true)?;
209    /// socket.set_nonblocking(true)?;
210    /// socket.bind(&addr.into())?;
211    /// // The custom backlog!
212    /// socket.listen(2048_i32)?;
213    ///
214    /// let listener = std::net::TcpListener::from(socket);
215    /// Server::new()
216    ///     .listen(listener.try_into()?)
217    ///     # ;
218    ///     // [...]
219    /// # Ok(())
220    /// # }
221    /// ````
222    ///
223    /// # Note
224    ///
225    /// A [`Server`] can listen to multiple streams of incoming connections: just call this method
226    /// multiple times!
227    pub fn listen(mut self, incoming: IncomingStream) -> Self {
228        self.incoming.push(incoming);
229        self
230    }
231
232    /// Start listening for incoming connections.
233    ///
234    /// You must specify:
235    ///
236    /// - a handler function, which will be called for each incoming request;
237    /// - the application state, the set of singleton components that will be available to
238    ///   your handler function.
239    ///
240    /// Both the handler function and the application state are usually code-generated by Pavex
241    /// starting from your [`Blueprint`](crate::Blueprint).
242    ///
243    /// # Wait for the server to shut down
244    ///
245    /// `serve` returns a [`ServerHandle`].
246    /// Calling `.await` on the handle lets you wait until the server shuts down.
247    ///
248    /// # Panics
249    ///
250    /// This method will panic if the [`Server`] has no registered sources of incoming connections,
251    /// i.e. if you did not call [`Server::bind`] or [`Server::listen`] before calling `serve`.
252    ///
253    /// If you'd rather handle the error, use [`Server::try_serve`] instead.
254    pub fn serve<HandlerFuture, ApplicationState>(
255        self,
256        handler: fn(
257            http::Request<hyper::body::Incoming>,
258            Option<ConnectionInfo>,
259            ApplicationState,
260        ) -> HandlerFuture,
261        application_state: ApplicationState,
262    ) -> ServerHandle
263    where
264        HandlerFuture: Future<Output = crate::Response> + 'static,
265        ApplicationState: Clone + Send + Sync + 'static,
266    {
267        self.try_serve(handler, application_state).unwrap()
268    }
269
270    /// A fallible version of [`Server::serve`].
271    ///
272    /// It will return an error, rather than panicking, if the [`Server`] has no registered sources
273    /// of incoming connections, i.e. if you did not call [`Server::bind`] or [`Server::listen`]
274    /// before calling `serve`.
275    pub fn try_serve<HandlerFuture, ApplicationState>(
276        self,
277        handler: fn(
278            http::Request<hyper::body::Incoming>,
279            Option<ConnectionInfo>,
280            ApplicationState,
281        ) -> HandlerFuture,
282        application_state: ApplicationState,
283    ) -> Result<ServerHandle, std::io::Error>
284    where
285        HandlerFuture: Future<Output = crate::Response> + 'static,
286        ApplicationState: Clone + Send + Sync + 'static,
287    {
288        if self.incoming.is_empty() {
289            let err_msg = "Cannot serve: there is no source of incoming connections. You must call `bind` or `listen` on the `Server` instance before invoking `serve`.";
290            return Err(std::io::Error::other(err_msg));
291        }
292        Ok(ServerHandle::new(
293            self.config,
294            self.incoming,
295            handler,
296            application_state,
297        ))
298    }
299}