HepLib
Loading...
Searching...
No Matches
PoolPipe.cpp
Go to the documentation of this file.
1
6#include "PoolPipe.h"
7#include "unistd.h"
8#include <stdexcept>
9#include <iostream>
10#include <arpa/inet.h>
11
12namespace HepLib {
13
14 Pool::Pool(const std::vector<void*> & vec) {
15 for (auto item : vec) ava.push_back(item);
16 }
17
18 void* Pool::acquire() {
19 std::unique_lock<std::mutex> lock(mtx);
20 while (ava.empty()) cv.wait(lock);
21 void* obj = ava.back();
22 ava.pop_back();
23 return obj;
24 }
25
26 void Pool::release(void* obj) {
27 {
28 std::lock_guard<std::mutex> lock(mtx);
29 ava.push_back(obj);
30 }
31 cv.notify_one();
32 }
33
34 iPool::iPool(int tot) {
35 for (int i=0; i<tot; i++) ava.push_back(i);
36 inited = true;
37 }
38
40
41 void iPool::init(int tot) {
42 for (int i=0; i<tot; i++) ava.push_back(i);
43 inited = true;
44 }
45
47 if(!inited) throw std::runtime_error("iPool: not initialized.");
48 std::unique_lock<std::mutex> lock(mtx);
49 while (ava.empty()) cv.wait(lock);
50 int obj = ava.back();
51 ava.pop_back();
52 return obj;
53 }
54
55 void iPool::release(int obj) {
56 if(!inited) throw std::runtime_error("iPool: not initialized.");
57 {
58 std::lock_guard<std::mutex> lock(mtx);
59 ava.push_back(obj);
60 }
61 cv.notify_one();
62 }
63
64 Pipe::Pipe(const std::function<std::string(const std::string &)> & fi) : f(fi) {
65 if (pipe(p2c)==-1 || pipe(c2p)==-1) {
66 perror("pipe");
67 exit(EXIT_FAILURE);
68 }
69
70 pid = fork();
71 if (pid == -1) {
72 perror("fork");
73 exit(EXIT_FAILURE);
74 }
75
76 if (pid==0) { // child process
77 close(p2c[1]); // close the write side
78 close(c2p[0]); // close the read side
79 char length_buffer[4];
80 uint32_t length;
81
82 while (true) {
83 ssize_t bytesRead = read(p2c[0], length_buffer, sizeof(length_buffer));
84 if (bytesRead <= 0) break;
85 length = ntohl(*(uint32_t*)length_buffer);
86
87 char* buffer = new char[length+1]; // +1 for '\0'
88 bytesRead = read(p2c[0], buffer, length);
89 buffer[bytesRead] = '\0';
90
91 std::string cmd(buffer);
92 delete[] buffer;
93
94 auto res = f(cmd); // cmd -> res
95
96 length = htonl(res.size());
97 write(c2p[1], &length, sizeof(length));
98 write(c2p[1], res.c_str(), res.size());
99 }
100
101 close(p2c[0]);
102 close(c2p[1]);
103 exit(EXIT_SUCCESS);
104 } else {
105 close(p2c[0]); // close the read
106 close(c2p[1]); // close the write
107 }
108 }
109
111 close(p2c[1]); // close the write
112 close(c2p[0]); // close the read
113 waitpid(pid, NULL, 0);
114 }
115
116 std::string Pipe::run(const std::string &code) {
117 uint32_t length = htonl(code.size());
118 write(p2c[1], &length, sizeof(length));
119 write(p2c[1], code.c_str(), code.size());
120
121 char length_buffer[4];
122 read(c2p[0], length_buffer, sizeof(length_buffer));
123 length = ntohl(*(uint32_t*)length_buffer);
124
125 char* buffer = new char[length+1]; // +1 for '\0'
126 ssize_t bytesRead = read(c2p[0], buffer, length);
127 buffer[bytesRead] = '\0';
128 std::string result(buffer);
129 delete[] buffer;
130
131 return result;
132 }
133
134 PipePool::PipePool(int size, const std::function<std::string(const std::string &)> & f) : pipe_ptr_vec(size) {
135 std::vector<void*> void_ptr_vec(size);
136 for(int i=0; i<size; i++) {
137 pipe_ptr_vec[i] = new Pipe(f);
138 void_ptr_vec[i] = pipe_ptr_vec[i];
139 }
140 pool = new Pool(void_ptr_vec);
141 }
142
144 for(int i=pipe_ptr_vec.size()-1; i>=0; i--) delete pipe_ptr_vec[i];
145 delete pool;
146 }
147
148 std::string PipePool::run(const std::string &code) {
149 auto worker = (Pipe*)(pool->acquire());
150 auto res = worker->run(code);
151 pool->release(worker);
152 return res;
153 }
154
155 std::vector<std::string> PipePool::run_all(const std::string &code) {
156 std::vector<std::string> res_vec(pipe_ptr_vec.size());
157 for(int i=0; i<res_vec.size(); i++) {
158 res_vec[i] = pipe_ptr_vec[i]->run(code);
159 }
160 return std::move(res_vec);
161 }
162
163}
PoolPipe header file.
std::string run(const std::string &code)
Definition PoolPipe.cpp:148
std::vector< std::string > run_all(const std::string &code)
Definition PoolPipe.cpp:155
PipePool(int size, const std::function< std::string(const std::string &)> &f)
Definition PoolPipe.cpp:134
Pipe(const std::function< std::string(const std::string &)> &fi)
Definition PoolPipe.cpp:64
std::string run(const std::string &code)
Definition PoolPipe.cpp:116
void * acquire()
Definition PoolPipe.cpp:18
void release(void *obj)
Definition PoolPipe.cpp:26
Pool(const std::vector< void * > &vec)
Definition PoolPipe.cpp:14
void init(int tot)
Definition PoolPipe.cpp:41
void release(int obj)
Definition PoolPipe.cpp:55
HepLib namespace.
Definition BASIC.cpp:17