Fri Nov 26 14:37:08 2010 UTC ()
Improve reliability in cases where client disconnects mid-operation.


(pooka)
diff -r1.16 -r1.17 src/lib/librumpuser/rumpuser_sp.c
diff -r1.10 -r1.11 src/lib/librumpuser/sp_common.c

cvs diff -r1.16 -r1.17 src/lib/librumpuser/rumpuser_sp.c (expand / switch to unified diff)

--- src/lib/librumpuser/rumpuser_sp.c 2010/11/26 10:59:14 1.16
+++ src/lib/librumpuser/rumpuser_sp.c 2010/11/26 14:37:08 1.17
@@ -1,14 +1,14 @@ @@ -1,14 +1,14 @@
1/* $NetBSD: rumpuser_sp.c,v 1.16 2010/11/26 10:59:14 pooka Exp $ */ 1/* $NetBSD: rumpuser_sp.c,v 1.17 2010/11/26 14:37:08 pooka Exp $ */
2 2
3/* 3/*
4 * Copyright (c) 2010 Antti Kantee. All Rights Reserved. 4 * Copyright (c) 2010 Antti Kantee. All Rights Reserved.
5 * 5 *
6 * Redistribution and use in source and binary forms, with or without 6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions 7 * modification, are permitted provided that the following conditions
8 * are met: 8 * are met:
9 * 1. Redistributions of source code must retain the above copyright 9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer. 10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright 11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the 12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution. 13 * documentation and/or other materials provided with the distribution.
14 * 14 *
@@ -28,27 +28,27 @@ @@ -28,27 +28,27 @@
28/* 28/*
29 * Sysproxy routines. This provides system RPC support over host sockets. 29 * Sysproxy routines. This provides system RPC support over host sockets.
30 * The most notable limitation is that the client and server must share 30 * The most notable limitation is that the client and server must share
31 * the same ABI. This does not mean that they have to be the same 31 * the same ABI. This does not mean that they have to be the same
32 * machine or that they need to run the same version of the host OS, 32 * machine or that they need to run the same version of the host OS,
33 * just that they must agree on the data structures. This even *might* 33 * just that they must agree on the data structures. This even *might*
34 * work correctly from one hardware architecture to another. 34 * work correctly from one hardware architecture to another.
35 * 35 *
36 * Not finished yet, i.e. don't use in production. Lacks locking plus 36 * Not finished yet, i.e. don't use in production. Lacks locking plus
37 * handling of multiple clients and unexpected connection closes. 37 * handling of multiple clients and unexpected connection closes.
38 */ 38 */
39 39
40#include <sys/cdefs.h> 40#include <sys/cdefs.h>
41__RCSID("$NetBSD: rumpuser_sp.c,v 1.16 2010/11/26 10:59:14 pooka Exp $"); 41__RCSID("$NetBSD: rumpuser_sp.c,v 1.17 2010/11/26 14:37:08 pooka Exp $");
42 42
43#include <sys/types.h> 43#include <sys/types.h>
44#include <sys/atomic.h> 44#include <sys/atomic.h>
45#include <sys/mman.h> 45#include <sys/mman.h>
46#include <sys/socket.h> 46#include <sys/socket.h>
47 47
48#include <arpa/inet.h> 48#include <arpa/inet.h>
49#include <netinet/in.h> 49#include <netinet/in.h>
50#include <netinet/tcp.h> 50#include <netinet/tcp.h>
51 51
52#include <assert.h> 52#include <assert.h>
53#include <errno.h> 53#include <errno.h>
54#include <fcntl.h> 54#include <fcntl.h>
@@ -335,26 +335,33 @@ spcrelease(struct spclient *spc) @@ -335,26 +335,33 @@ spcrelease(struct spclient *spc)
335static void 335static void
336serv_handledisco(unsigned int idx) 336serv_handledisco(unsigned int idx)
337{ 337{
338 struct spclient *spc = &spclist[idx]; 338 struct spclient *spc = &spclist[idx];
339 339
340 DPRINTF(("rump_sp: disconnecting [%u]\n", idx)); 340 DPRINTF(("rump_sp: disconnecting [%u]\n", idx));
341 341
342 pfdlist[idx].fd = -1; 342 pfdlist[idx].fd = -1;
343 pfdlist[idx].revents = 0; 343 pfdlist[idx].revents = 0;
344 pthread_mutex_lock(&spc->spc_mtx); 344 pthread_mutex_lock(&spc->spc_mtx);
345 spc->spc_dying = 1; 345 spc->spc_dying = 1;
346 kickall(spc); 346 kickall(spc);
347 pthread_mutex_unlock(&spc->spc_mtx); 347 pthread_mutex_unlock(&spc->spc_mtx);
 348
 349 /*
 350 * Nobody's going to attempt to send/receive anymore,
 351 * so reinit info relevant to that.
 352 */
 353 memset((char *)spc + SPC_ZEROFF, 0, sizeof(*spc) - SPC_ZEROFF);
 354
348 spcrelease(spc); 355 spcrelease(spc);
349} 356}
350 357
351static unsigned 358static unsigned
352serv_handleconn(int fd, connecthook_fn connhook, int busy) 359serv_handleconn(int fd, connecthook_fn connhook, int busy)
353{ 360{
354 struct sockaddr_storage ss; 361 struct sockaddr_storage ss;
355 socklen_t sl = sizeof(ss); 362 socklen_t sl = sizeof(ss);
356 int newfd, flags; 363 int newfd, flags;
357 unsigned i; 364 unsigned i;
358 365
359 /*LINTED: cast ok */ 366 /*LINTED: cast ok */
360 newfd = accept(fd, (struct sockaddr *)&ss, &sl); 367 newfd = accept(fd, (struct sockaddr *)&ss, &sl);

cvs diff -r1.10 -r1.11 src/lib/librumpuser/sp_common.c (expand / switch to unified diff)

--- src/lib/librumpuser/sp_common.c 2010/11/25 17:59:02 1.10
+++ src/lib/librumpuser/sp_common.c 2010/11/26 14:37:08 1.11
@@ -1,14 +1,14 @@ @@ -1,14 +1,14 @@
1/* $NetBSD: sp_common.c,v 1.10 2010/11/25 17:59:02 pooka Exp $ */ 1/* $NetBSD: sp_common.c,v 1.11 2010/11/26 14:37:08 pooka Exp $ */
2 2
3/* 3/*
4 * Copyright (c) 2010 Antti Kantee. All Rights Reserved. 4 * Copyright (c) 2010 Antti Kantee. All Rights Reserved.
5 * 5 *
6 * Redistribution and use in source and binary forms, with or without 6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions 7 * modification, are permitted provided that the following conditions
8 * are met: 8 * are met:
9 * 1. Redistributions of source code must retain the above copyright 9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer. 10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright 11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the 12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution. 13 * documentation and/or other materials provided with the distribution.
14 * 14 *
@@ -37,26 +37,27 @@ @@ -37,26 +37,27 @@
37#include <sys/socket.h> 37#include <sys/socket.h>
38#include <sys/un.h> 38#include <sys/un.h>
39 39
40#include <arpa/inet.h> 40#include <arpa/inet.h>
41#include <netinet/in.h> 41#include <netinet/in.h>
42#include <netinet/tcp.h> 42#include <netinet/tcp.h>
43 43
44#include <assert.h> 44#include <assert.h>
45#include <errno.h> 45#include <errno.h>
46#include <fcntl.h> 46#include <fcntl.h>
47#include <poll.h> 47#include <poll.h>
48#include <pthread.h> 48#include <pthread.h>
49#include <stdarg.h> 49#include <stdarg.h>
 50#include <stddef.h>
50#include <stdio.h> 51#include <stdio.h>
51#include <stdlib.h> 52#include <stdlib.h>
52#include <string.h> 53#include <string.h>
53#include <unistd.h> 54#include <unistd.h>
54 55
55//#define DEBUG 56//#define DEBUG
56#ifdef DEBUG 57#ifdef DEBUG
57#define DPRINTF(x) mydprintf x 58#define DPRINTF(x) mydprintf x
58static void 59static void
59mydprintf(const char *fmt, ...) 60mydprintf(const char *fmt, ...)
60{ 61{
61 va_list ap; 62 va_list ap;
62 63
@@ -113,42 +114,44 @@ struct respwait { @@ -113,42 +114,44 @@ struct respwait {
113 void *rw_data; 114 void *rw_data;
114 size_t rw_dlen; 115 size_t rw_dlen;
115 116
116 pthread_cond_t rw_cv; 117 pthread_cond_t rw_cv;
117 118
118 TAILQ_ENTRY(respwait) rw_entries; 119 TAILQ_ENTRY(respwait) rw_entries;
119}; 120};
120 121
121struct spclient { 122struct spclient {
122 int spc_fd; 123 int spc_fd;
123 int spc_refcnt; 124 int spc_refcnt;
124 int spc_dying; 125 int spc_dying;
125 126
 127 pthread_mutex_t spc_mtx;
 128 pthread_cond_t spc_cv;
 129
126 struct lwp *spc_mainlwp; 130 struct lwp *spc_mainlwp;
127 pid_t spc_pid; 131 pid_t spc_pid;
128 132
 133 TAILQ_HEAD(, respwait) spc_respwait;
 134
 135 /* rest of the fields are zeroed upon disconnect */
 136#define SPC_ZEROFF offsetof(struct spclient, spc_pid)
129 struct pollfd *spc_pfd; 137 struct pollfd *spc_pfd;
130 138
131 struct rsp_hdr spc_hdr; 139 struct rsp_hdr spc_hdr;
132 uint8_t *spc_buf; 140 uint8_t *spc_buf;
133 size_t spc_off; 141 size_t spc_off;
134 142
135 pthread_mutex_t spc_mtx; 
136 pthread_cond_t spc_cv; 
137 
138 uint64_t spc_nextreq; 143 uint64_t spc_nextreq;
139 int spc_ostatus, spc_istatus; 144 int spc_ostatus, spc_istatus;
140 
141 TAILQ_HEAD(, respwait) spc_respwait; 
142}; 145};
143#define SPCSTATUS_FREE 0 146#define SPCSTATUS_FREE 0
144#define SPCSTATUS_BUSY 1 147#define SPCSTATUS_BUSY 1
145#define SPCSTATUS_WANTED 2 148#define SPCSTATUS_WANTED 2
146 149
147typedef int (*addrparse_fn)(const char *, struct sockaddr **, int); 150typedef int (*addrparse_fn)(const char *, struct sockaddr **, int);
148typedef int (*connecthook_fn)(int); 151typedef int (*connecthook_fn)(int);
149 152
150static int readframe(struct spclient *); 153static int readframe(struct spclient *);
151static void handlereq(struct spclient *); 154static void handlereq(struct spclient *);
152 155
153static void 156static void
154sendlock(struct spclient *spc) 157sendlock(struct spclient *spc)
@@ -239,27 +242,27 @@ kickwaiter(struct spclient *spc) @@ -239,27 +242,27 @@ kickwaiter(struct spclient *spc)
239 242
240 pthread_mutex_lock(&spc->spc_mtx); 243 pthread_mutex_lock(&spc->spc_mtx);
241 TAILQ_FOREACH(rw, &spc->spc_respwait, rw_entries) { 244 TAILQ_FOREACH(rw, &spc->spc_respwait, rw_entries) {
242 if (rw->rw_reqno == spc->spc_hdr.rsp_reqno) 245 if (rw->rw_reqno == spc->spc_hdr.rsp_reqno)
243 break; 246 break;
244 } 247 }
245 if (rw == NULL) { 248 if (rw == NULL) {
246 printf("PANIC: no waiter\n"); 249 printf("PANIC: no waiter\n");
247 abort(); 250 abort();
248 return; 251 return;
249 } 252 }
250 DPRINTF(("rump_sp: client %p woke up waiter at %p\n", spc, rw)); 253 DPRINTF(("rump_sp: client %p woke up waiter at %p\n", spc, rw));
251 rw->rw_data = spc->spc_buf; 254 rw->rw_data = spc->spc_buf;
252 rw->rw_dlen = (size_t)spc->spc_off; 255 rw->rw_dlen = (size_t)(spc->spc_off - HDRSZ);
253 pthread_cond_signal(&rw->rw_cv); 256 pthread_cond_signal(&rw->rw_cv);
254 pthread_mutex_unlock(&spc->spc_mtx); 257 pthread_mutex_unlock(&spc->spc_mtx);
255 258
256 spc->spc_buf = NULL; 259 spc->spc_buf = NULL;
257 spc->spc_off = 0; 260 spc->spc_off = 0;
258} 261}
259 262
260static void 263static void
261kickall(struct spclient *spc) 264kickall(struct spclient *spc)
262{ 265{
263 struct respwait *rw; 266 struct respwait *rw;
264 267
265 /* DIAGASSERT(mutex_owned(spc_lock)) */ 268 /* DIAGASSERT(mutex_owned(spc_lock)) */
@@ -282,60 +285,63 @@ waitresp(struct spclient *spc, struct re @@ -282,60 +285,63 @@ waitresp(struct spclient *spc, struct re
282 pthread_mutex_unlock(&spc->spc_mtx); 285 pthread_mutex_unlock(&spc->spc_mtx);
283 286
284 pfd.fd = spc->spc_fd; 287 pfd.fd = spc->spc_fd;
285 pfd.events = POLLIN; 288 pfd.events = POLLIN;
286 289
287 for (gotresp = 0; !gotresp; ) { 290 for (gotresp = 0; !gotresp; ) {
288 switch (readframe(spc)) { 291 switch (readframe(spc)) {
289 case 0: 292 case 0:
290 poll(&pfd, 1, INFTIM); 293 poll(&pfd, 1, INFTIM);
291 continue; 294 continue;
292 case -1: 295 case -1:
293 rv = errno; 296 rv = errno;
294 spc->spc_dying = 1; 297 spc->spc_dying = 1;
295 break; 298 goto cleanup;
296 default: 299 default:
297 break; 300 break;
298 } 301 }
299 302
300 switch (spc->spc_hdr.rsp_class) { 303 switch (spc->spc_hdr.rsp_class) {
301 case RUMPSP_RESP: 304 case RUMPSP_RESP:
302 kickwaiter(spc); 305 kickwaiter(spc);
303 gotresp = spc->spc_hdr.rsp_reqno == 306 gotresp = spc->spc_hdr.rsp_reqno ==
304 rw->rw_reqno; 307 rw->rw_reqno;
305 break; 308 break;
306 case RUMPSP_REQ: 309 case RUMPSP_REQ:
307 handlereq(spc); 310 handlereq(spc);
308 break; 311 break;
309 default: 312 default:
310 /* panic */ 313 /* panic */
311 break; 314 break;
312 } 315 }
313 } 316 }
 317 cleanup:
314 pthread_mutex_lock(&spc->spc_mtx); 318 pthread_mutex_lock(&spc->spc_mtx);
315 if (spc->spc_istatus == SPCSTATUS_WANTED) 319 if (spc->spc_istatus == SPCSTATUS_WANTED)
316 kickall(spc); 320 kickall(spc);
317 spc->spc_istatus = SPCSTATUS_FREE; 321 spc->spc_istatus = SPCSTATUS_FREE;
318 } else { 322 } else {
319 spc->spc_istatus = SPCSTATUS_WANTED; 323 spc->spc_istatus = SPCSTATUS_WANTED;
320 pthread_cond_wait(&rw->rw_cv, &spc->spc_mtx); 324 pthread_cond_wait(&rw->rw_cv, &spc->spc_mtx);
321 } 325 }
322 } 326 }
323 327
324 TAILQ_REMOVE(&spc->spc_respwait, rw, rw_entries); 328 TAILQ_REMOVE(&spc->spc_respwait, rw, rw_entries);
325 pthread_mutex_unlock(&spc->spc_mtx); 329 pthread_mutex_unlock(&spc->spc_mtx);
326 330
327 pthread_cond_destroy(&rw->rw_cv); 331 pthread_cond_destroy(&rw->rw_cv);
328 332
 333 if (rv == 0 && spc->spc_dying)
 334 rv = ENOTCONN;
329 return rv; 335 return rv;
330} 336}
331 337
332static int 338static int
333readframe(struct spclient *spc) 339readframe(struct spclient *spc)
334{ 340{
335 int fd = spc->spc_fd; 341 int fd = spc->spc_fd;
336 size_t left; 342 size_t left;
337 size_t framelen; 343 size_t framelen;
338 ssize_t n; 344 ssize_t n;
339 345
340 /* still reading header? */ 346 /* still reading header? */
341 if (spc->spc_off < HDRSZ) { 347 if (spc->spc_off < HDRSZ) {