1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
use crate::{worker::worker, Options, Task, TaskProvider};
use std::{
fs,
io::{self, Error as IoError},
os::unix::net::UnixListener,
path::PathBuf,
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct UnixDomainSocket<T: TaskProvider + 'static> {
path: PathBuf,
options: Options,
provider: T,
}
impl<T: TaskProvider> UnixDomainSocket<T> {
pub fn new<P: Into<PathBuf>>(path: P, options: Option<Options>, provider: T) -> Self {
let path = path.into();
let options = options.unwrap_or(Options::default());
UnixDomainSocket {
path,
options,
provider,
}
}
pub fn bind(self) -> Result<(), IoError> {
if self.path.as_path().exists() {
fs::remove_file(self.path.as_path())?;
}
let path = self
.path
.to_str()
.map(|p| Ok(p))
.unwrap_or(Err(IoError::new(
io::ErrorKind::Other,
"Invalid path returned by the buffer",
)))?;
let (tx, rx) = mpsc::channel();
let rx = Mutex::new(rx);
let rx = Arc::new(rx);
let listener = UnixListener::bind(path)?;
info!("UnixDomainSocket bound on {}", path);
let workers: Vec<thread::JoinHandle<_>> = (0..self.options.workers)
.map(|_| {
let t = tx.clone();
let r = Arc::clone(&rx);
let p = self.provider.clone();
thread::spawn(move || worker(t, r, p))
})
.collect();
let t = tx.clone();
thread::spawn(move || {
for socket in listener.incoming() {
socket
.and_then(|s| {
t.send(Task::Socket(s))
.map_err(|e| IoError::new(io::ErrorKind::Other, e))
})
.unwrap_or_else(|e| {
error!("Error receiving the UDS socket: {}", e);
});
}
});
for w in workers {
w.join().unwrap_or_else(|e| {
error!("Error ending the worker thread gracefully: {:?}", e);
});
}
Ok(info!("Unbinding UDS"))
}
}