Files
aho_corasick
ansi_term
atty
backtrace
backtrace_sys
bitflags
blindbid
block_buffer
block_padding
bulletproofs
byte_tools
byteorder
cfg_if
chrono
clap
clear_on_drop
curve25519_dalek
digest
dusk_blindbidproof
dusk_tlv
dusk_uds
env_logger
failure
failure_derive
fake_simd
futures
futures_channel
futures_core
futures_executor
futures_io
futures_macro
futures_sink
futures_task
futures_util
async_await
future
io
lock
sink
stream
task
generic_array
humantime
keccak
lazy_static
libc
log
memchr
merlin
num_cpus
num_integer
num_traits
opaque_debug
packed_simd
pin_utils
proc_macro2
proc_macro_hack
proc_macro_nested
quick_error
quote
rand
rand_chacha
rand_core
rand_hc
rand_isaac
rand_jitter
rand_os
rand_pcg
rand_xorshift
regex
regex_syntax
rustc_demangle
serde
serde_derive
sha2
sha3
slab
strsim
subtle
syn
synstructure
termcolor
textwrap
thread_local
time
typenum
unicode_width
unicode_xid
vec_map
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
use crate::{worker::worker, Options, Task, TaskProvider};

use std::{
    fs,
    io::{self, Error as IoError},
    os::unix::net::UnixListener,
    path::PathBuf,
    sync::{mpsc, Arc, Mutex},
    thread,
};

/// Boilerplate for [`UnixListener`].
///
/// Will receive a path to bind to, a set of options and an implementation of future that will
/// handle the incoming sockets.
pub struct UnixDomainSocket<T: TaskProvider + 'static> {
    path: PathBuf,
    options: Options,
    provider: T,
}

impl<T: TaskProvider> UnixDomainSocket<T> {
    /// Default constructor.
    pub fn new<P: Into<PathBuf>>(path: P, options: Option<Options>, provider: T) -> Self {
        let path = path.into();
        let options = options.unwrap_or(Options::default());

        UnixDomainSocket {
            path,
            options,
            provider,
        }
    }

    /// Will remove the [`UnixDomainSocket::path`], if it exists, so it cant bind properly to that
    /// location.
    ///
    /// If the future returns a [`crate::Message::ShouldQuit`], the worker threads will be finished after
    /// the current queue of sockets and the main loop will end.
    pub fn bind(self) -> Result<(), IoError> {
        // Grant the provided path is available to the process
        if self.path.as_path().exists() {
            fs::remove_file(self.path.as_path())?;
        }

        // Prepare the path to bind
        let path = self
            .path
            .to_str()
            .map(|p| Ok(p))
            .unwrap_or(Err(IoError::new(
                io::ErrorKind::Other,
                "Invalid path returned by the buffer",
            )))?;

        // Create the task queue channel that will be share amongst the worker threads
        let (tx, rx) = mpsc::channel();
        let rx = Mutex::new(rx);
        let rx = Arc::new(rx);

        // Perform the bind
        let listener = UnixListener::bind(path)?;
        info!("UnixDomainSocket bound on {}", path);

        // Spawn the workers, each opne with an ownership to the queue channel, and the future
        // provider
        let workers: Vec<thread::JoinHandle<_>> = (0..self.options.workers)
            .map(|_| {
                let t = tx.clone();
                let r = Arc::clone(&rx);
                let p = self.provider.clone();

                thread::spawn(move || worker(t, r, p))
            })
            .collect();

        // Spawn a thread to perform the actual listening.
        //
        // When there is an incoming socket, transform it to a Task and send to the queue channel
        let t = tx.clone();
        thread::spawn(move || {
            for socket in listener.incoming() {
                socket
                    .and_then(|s| {
                        t.send(Task::Socket(s))
                            .map_err(|e| IoError::new(io::ErrorKind::Other, e))
                    })
                    .unwrap_or_else(|e| {
                        error!("Error receiving the UDS socket: {}", e);
                    });
            }
        });

        // Wait until all the workers are finished
        for w in workers {
            w.join().unwrap_or_else(|e| {
                error!("Error ending the worker thread gracefully: {:?}", e);
            });
        }

        Ok(info!("Unbinding UDS"))
    }
}