Thu Oct 22 19:57:47 2015 UTC ()
Fix missing change from the machine the scan change was tested on.


(joerg)
diff -r1.9 -r1.10 pkgsrc/pkgtools/pbulk/files/pbulk/pscan/master.c

cvs diff -r1.9 -r1.10 pkgsrc/pkgtools/pbulk/files/pbulk/pscan/master.c (switch to unified diff)

--- pkgsrc/pkgtools/pbulk/files/pbulk/pscan/master.c 2015/10/21 23:03:17 1.9
+++ pkgsrc/pkgtools/pbulk/files/pbulk/pscan/master.c 2015/10/22 19:57:47 1.10
@@ -1,297 +1,296 @@ @@ -1,297 +1,296 @@
1/* $NetBSD: master.c,v 1.9 2015/10/21 23:03:17 joerg Exp $ */ 1/* $NetBSD: master.c,v 1.10 2015/10/22 19:57:47 joerg Exp $ */
2 2
3/*- 3/*-
4 * Copyright (c) 2007, 2009 Joerg Sonnenberger <joerg@NetBSD.org>. 4 * Copyright (c) 2007, 2009 Joerg Sonnenberger <joerg@NetBSD.org>.
5 * All rights reserved. 5 * All rights reserved.
6 * 6 *
7 * This code was developed as part of Google's Summer of Code 2007 program. 7 * This code was developed as part of Google's Summer of Code 2007 program.
8 * 8 *
9 * Redistribution and use in source and binary forms, with or without 9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions 10 * modification, are permitted provided that the following conditions
11 * are met: 11 * are met:
12 * 12 *
13 * 1. Redistributions of source code must retain the above copyright 13 * 1. Redistributions of source code must retain the above copyright
14 * notice, this list of conditions and the following disclaimer. 14 * notice, this list of conditions and the following disclaimer.
15 * 2. Redistributions in binary form must reproduce the above copyright 15 * 2. Redistributions in binary form must reproduce the above copyright
16 * notice, this list of conditions and the following disclaimer in 16 * notice, this list of conditions and the following disclaimer in
17 * the documentation and/or other materials provided with the 17 * the documentation and/or other materials provided with the
18 * distribution. 18 * distribution.
19 * 19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 21 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
23 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 23 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
24 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 24 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
25 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 25 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
26 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 26 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
27 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 27 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
28 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 28 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
29 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 29 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
30 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 30 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
31 * SUCH DAMAGE. 31 * SUCH DAMAGE.
32 */ 32 */
33 33
34#include <nbcompat.h> 34#include <nbcompat.h>
35 35
36#include <nbcompat/types.h> 36#include <nbcompat/types.h>
37#include <nbcompat/queue.h> 37#include <nbcompat/queue.h>
38#include <sys/ioctl.h> 38#include <sys/ioctl.h>
39#include <sys/socket.h> 39#include <sys/socket.h>
40#include <nbcompat/time.h> 40#include <nbcompat/time.h>
41#include <sys/wait.h> 41#include <sys/wait.h>
42#include <nbcompat/err.h> 42#include <nbcompat/err.h>
43#include <signal.h> 43#include <signal.h>
44#include <fcntl.h> 44#include <fcntl.h>
45#include <nbcompat/stdlib.h> 45#include <nbcompat/stdlib.h>
46#include <nbcompat/stdio.h> 46#include <nbcompat/stdio.h>
47#include <nbcompat/string.h> 47#include <nbcompat/string.h>
48 48
49#include <arpa/inet.h> 49#include <arpa/inet.h>
50 50
51#include "pbulk.h" 51#include "pbulk.h"
52#include "pscan.h" 52#include "pscan.h"
53 53
54static int clients_started; 54static int clients_started;
55static LIST_HEAD(, scan_peer) active_peers, inactive_peers; 55static LIST_HEAD(, scan_peer) active_peers, inactive_peers;
56static struct event listen_event; 56static struct event listen_event;
57static int listen_event_socket; 57static int listen_event_socket;
58static struct signal_event child_event; 58static struct signal_event child_event;
59static pid_t child_pid; 59static pid_t child_pid;
60 60
61struct scan_peer { 61struct scan_peer {
62 LIST_ENTRY(scan_peer) peer_link; 62 LIST_ENTRY(scan_peer) peer_link;
63 63
64 struct scan_job *job; 64 struct scan_job *job;
65 65
66 int fd; 66 int fd;
67 char tmp_buf[4]; 67 char tmp_buf[4];
68 68
69 size_t output_len; 69 size_t output_len;
70}; 70};
71 71
72static void assign_job(struct scan_peer *); 72static void assign_job(struct scan_peer *);
73 73
74static void 74static void
75do_nothing(void *arg) 75do_nothing(void *arg)
76{ 76{
77} 77}
78 78
79static void 79static void
80kill_peer(void *arg) 80kill_peer(void *arg)
81{ 81{
82 struct scan_peer *peer = arg; 82 struct scan_peer *peer = arg;
83 83
84 (void)close(peer->fd); 84 (void)close(peer->fd);
85 LIST_REMOVE(peer, peer_link); 85 LIST_REMOVE(peer, peer_link);
86 free(peer->job->scan_output); 86 free(peer->job->scan_output);
87 peer->job->scan_output = NULL; 87 peer->job->scan_output = NULL;
88 process_job(peer->job, JOB_OPEN); 88 process_job(peer->job, JOB_OPEN);
89 free(peer); 89 free(peer);
90 90
91 peer = LIST_FIRST(&inactive_peers); 91 peer = LIST_FIRST(&inactive_peers);
92 if (peer == NULL) 92 if (peer == NULL)
93 return; 93 return;
94 LIST_REMOVE(peer, peer_link); 94 LIST_REMOVE(peer, peer_link);
95 assign_job(peer); 95 assign_job(peer);
96} 96}
97 97
98static void 98static void
99finish_job(void *arg) 99finish_job(void *arg)
100{ 100{
101 struct scan_peer *peer = arg; 101 struct scan_peer *peer = arg;
102 102
103 if (strlen(peer->job->scan_output) != peer->output_len) { 103 if (strlen(peer->job->scan_output) != peer->output_len) {
104 warnx("Invalid output len received from peer"); 104 warnx("Invalid output len received from peer");
105 kill_peer(peer); 105 kill_peer(peer);
106 return; 106 return;
107 } 107 }
108 LIST_REMOVE(peer, peer_link); 108 LIST_REMOVE(peer, peer_link);
109 process_job(peer->job, JOB_DONE); 109 process_job(peer->job, JOB_DONE);
110 assign_job(peer); 110 assign_job(peer);
111} 111}
112 112
113static void 113static void
114recv_output(void *arg) 114recv_output(void *arg)
115{ 115{
116 struct scan_peer *peer = arg; 116 struct scan_peer *peer = arg;
117 uint32_t output_len; 117 uint32_t output_len;
118 118
119 (void)memcpy(&output_len, peer->tmp_buf, 4); 119 (void)memcpy(&output_len, peer->tmp_buf, 4);
120 output_len = ntohl(output_len); 120 output_len = ntohl(output_len);
121 if (output_len == 0) { 121 if (output_len == 0) {
122 LIST_REMOVE(peer, peer_link); 122 LIST_REMOVE(peer, peer_link);
123 process_job(peer->job, JOB_DONE); 123 process_job(peer->job, JOB_DONE);
124 assign_job(peer); 124 assign_job(peer);
125 return; 125 return;
126 } 126 }
127 if (output_len == 0xffffff) { 127 if (output_len == 0xffffff) {
128 warnx("Invalid output len received from peer"); 128 warnx("Invalid output len received from peer");
129 kill_peer(peer); 129 kill_peer(peer);
130 return; 130 return;
131 } 131 }
132 peer->job->scan_output = xmalloc(output_len + 1); 132 peer->job->scan_output = xmalloc(output_len + 1);
133 peer->job->scan_output[output_len] = '\0'; 133 peer->job->scan_output[output_len] = '\0';
134 peer->output_len = output_len; 134 peer->output_len = output_len;
135 deferred_read(peer->fd, peer->job->scan_output, output_len, peer, finish_job, kill_peer); 135 deferred_read(peer->fd, peer->job->scan_output, output_len, peer, finish_job, kill_peer);
136} 136}
137 137
138static void 138static void
139recv_output_len(void *arg) 139recv_output_len(void *arg)
140{ 140{
141 struct scan_peer *peer = arg; 141 struct scan_peer *peer = arg;
142 142
143 deferred_read(peer->fd, peer->tmp_buf, 4, peer, recv_output, kill_peer); 143 deferred_read(peer->fd, peer->tmp_buf, 4, peer, recv_output, kill_peer);
144} 144}
145 145
146static void 146static void
147send_job_path(void *arg) 147send_job_path(void *arg)
148{ 148{
149 struct scan_peer *peer = arg; 149 struct scan_peer *peer = arg;
150 150
151 deferred_write(peer->fd, peer->job->pkg_location, 151 deferred_write(peer->fd, peer->job->pkg_location,
152 strlen(peer->job->pkg_location), peer, recv_output_len, 152 strlen(peer->job->pkg_location), peer, recv_output_len,
153 kill_peer); 153 kill_peer);
154} 154}
155 155
156static void 156static void
157shutdown_master(void) 157shutdown_master(void)
158{ 158{
159 struct timeval tv; 159 struct timeval tv;
160 struct scan_peer *peer; 160 struct scan_peer *peer;
161 161
162 event_del(&listen_event); 162 event_del(&listen_event);
163 (void)close(listen_event_socket); 163 (void)close(listen_event_socket);
164 LIST_FOREACH(peer, &inactive_peers, peer_link) { 164 LIST_FOREACH(peer, &inactive_peers, peer_link) {
165 (void)shutdown(peer->fd, SHUT_RDWR); 165 uint16_t net_job_len = htons(0);
166 uint16_t net_job_len = htons(job_len); 
167 (void)memcpy(peer->tmp_buf, &net_job_len, 2); 166 (void)memcpy(peer->tmp_buf, &net_job_len, 2);
168 167
169 deferred_write(peer->fd, peer->tmp_buf, 2, peer, do_nothing, 168 deferred_write(peer->fd, peer->tmp_buf, 2, peer, do_nothing,
170 kill_peer); 169 kill_peer);
171 } 170 }
172 tv.tv_sec = 1; 171 tv.tv_sec = 1;
173 tv.tv_usec = 0; 172 tv.tv_usec = 0;
174 event_loopexit(&tv); 173 event_loopexit(&tv);
175} 174}
176 175
177static void 176static void
178assign_job(struct scan_peer *peer) 177assign_job(struct scan_peer *peer)
179{ 178{
180 size_t job_len; 179 size_t job_len;
181 uint16_t net_job_len; 180 uint16_t net_job_len;
182 181
183 peer->job = clients_started ? get_job() : NULL; 182 peer->job = clients_started ? get_job() : NULL;
184 if (peer->job == NULL) { 183 if (peer->job == NULL) {
185 LIST_INSERT_HEAD(&inactive_peers, peer, peer_link); 184 LIST_INSERT_HEAD(&inactive_peers, peer, peer_link);
186 if (LIST_EMPTY(&active_peers) && clients_started) 185 if (LIST_EMPTY(&active_peers) && clients_started)
187 shutdown_master(); 186 shutdown_master();
188 return; 187 return;
189 } 188 }
190 189
191 LIST_INSERT_HEAD(&active_peers, peer, peer_link); 190 LIST_INSERT_HEAD(&active_peers, peer, peer_link);
192 191
193 peer->job->scan_output = NULL; 192 peer->job->scan_output = NULL;
194 193
195 job_len = strlen(peer->job->pkg_location); 194 job_len = strlen(peer->job->pkg_location);
196 if (job_len > 0xffff) 195 if (job_len > 0xffff)
197 errx(1, "Location inside pkgsrc tree too long"); 196 errx(1, "Location inside pkgsrc tree too long");
198 net_job_len = htons(job_len); 197 net_job_len = htons(job_len);
199 (void)memcpy(peer->tmp_buf, &net_job_len, 2); 198 (void)memcpy(peer->tmp_buf, &net_job_len, 2);
200 199
201 deferred_write(peer->fd, peer->tmp_buf, 2, peer, send_job_path, 200 deferred_write(peer->fd, peer->tmp_buf, 2, peer, send_job_path,
202 kill_peer); 201 kill_peer);
203} 202}
204 203
205static void 204static void
206listen_handler(int sock, void *arg) 205listen_handler(int sock, void *arg)
207{ 206{
208 struct scan_peer *peer; 207 struct scan_peer *peer;
209 struct sockaddr_in src; 208 struct sockaddr_in src;
210 socklen_t src_len; 209 socklen_t src_len;
211 int fd; 210 int fd;
212 211
213 src_len = sizeof(src); 212 src_len = sizeof(src);
214 if ((fd = accept(sock, (struct sockaddr *)&src, &src_len)) == -1) { 213 if ((fd = accept(sock, (struct sockaddr *)&src, &src_len)) == -1) {
215 warn("Could not accept connection"); 214 warn("Could not accept connection");
216 return; 215 return;
217 } 216 }
218 if (set_nonblocking(fd) == -1) { 217 if (set_nonblocking(fd) == -1) {
219 (void)close(fd); 218 (void)close(fd);
220 warn("Could not set non-blocking IO"); 219 warn("Could not set non-blocking IO");
221 return; 220 return;
222 } 221 }
223 222
224 peer = xmalloc(sizeof(*peer)); 223 peer = xmalloc(sizeof(*peer));
225 peer->fd = fd; 224 peer->fd = fd;
226 assign_job(peer); 225 assign_job(peer);
227} 226}
228 227
229static void 228static void
230child_handler(struct signal_event *ev) 229child_handler(struct signal_event *ev)
231{ 230{
232 struct scan_peer *peer; 231 struct scan_peer *peer;
233 int status; 232 int status;
234 233
235 if (waitpid(child_pid, &status, WNOHANG) == -1) { 234 if (waitpid(child_pid, &status, WNOHANG) == -1) {
236 if (errno == ECHILD) 235 if (errno == ECHILD)
237 return; 236 return;
238 err(1, "Could not wait for child"); 237 err(1, "Could not wait for child");
239 } 238 }
240 if (status != 0) 239 if (status != 0)
241 err(1, "Start script failed"); 240 err(1, "Start script failed");
242 241
243 clients_started = 1; 242 clients_started = 1;
244 signal_del(ev); 243 signal_del(ev);
245 244
246 while ((peer = LIST_FIRST(&inactive_peers)) != NULL) { 245 while ((peer = LIST_FIRST(&inactive_peers)) != NULL) {
247 LIST_REMOVE(peer, peer_link); 246 LIST_REMOVE(peer, peer_link);
248 assign_job(peer); 247 assign_job(peer);
249 if (peer-> job == NULL) 248 if (peer-> job == NULL)
250 break; 249 break;
251 } 250 }
252} 251}
253 252
254void 253void
255master_mode(const char *master_port, const char *start_script) 254master_mode(const char *master_port, const char *start_script)
256{ 255{
257 struct sockaddr_in dst; 256 struct sockaddr_in dst;
258 int fd; 257 int fd;
259 258
260 LIST_INIT(&active_peers); 259 LIST_INIT(&active_peers);
261 LIST_INIT(&inactive_peers); 260 LIST_INIT(&inactive_peers);
262 261
263 event_init(); 262 event_init();
264 263
265 if (parse_sockaddr_in(master_port, &dst)) 264 if (parse_sockaddr_in(master_port, &dst))
266 errx(1, "Could not parse addr/port"); 265 errx(1, "Could not parse addr/port");
267 266
268 fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); 267 fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
269 if (fd == -1) 268 if (fd == -1)
270 err(1, "Could not create socket");  269 err(1, "Could not create socket");
271 if (fcntl(fd, F_SETFD, FD_CLOEXEC) == -1) 270 if (fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
272 err(1, "Could not set close-on-exec flag"); 271 err(1, "Could not set close-on-exec flag");
273 if (bind(fd, (struct sockaddr *)&dst, sizeof(dst)) == -1) 272 if (bind(fd, (struct sockaddr *)&dst, sizeof(dst)) == -1)
274 err(1, "Could not bind socket"); 273 err(1, "Could not bind socket");
275 if (listen(fd, 5) == -1) 274 if (listen(fd, 5) == -1)
276 err(1, "Could not listen on socket"); 275 err(1, "Could not listen on socket");
277 276
278 event_add(&listen_event, fd, 0, 1, listen_handler, NULL); 277 event_add(&listen_event, fd, 0, 1, listen_handler, NULL);
279 listen_event_socket = fd; 278 listen_event_socket = fd;
280 279
281 if (start_script) { 280 if (start_script) {
282 signal_add(&child_event, SIGCHLD, child_handler); 281 signal_add(&child_event, SIGCHLD, child_handler);
283 282
284 if ((child_pid = vfork()) == 0) { 283 if ((child_pid = vfork()) == 0) {
285 execlp(start_script, start_script, (char *)NULL); 284 execlp(start_script, start_script, (char *)NULL);
286 _exit(255); 285 _exit(255);
287 } 286 }
288 if (child_pid == -1) 287 if (child_pid == -1)
289 err(1, "Could not fork start script"); 288 err(1, "Could not fork start script");
290 } else { 289 } else {
291 clients_started = 1; 290 clients_started = 1;
292 } 291 }
293 292
294 event_dispatch(); 293 event_dispatch();
295 294
296 (void)close(fd); 295 (void)close(fd);
297} 296}