blob: c2dc35b4f8bf8b447e878069127d4d19b58cfd99 [file] [log] [blame]
danb0b27ab2018-12-07 20:25:14 +00001/*
2** 2017 June 7
3**
4** The author disclaims copyright to this source code. In place of
5** a legal notice, here is a blessing:
6**
7** May you do good and not evil.
8** May you find forgiveness for yourself and forgive others.
9** May you share freely, never taking more than you give.
10**
11*************************************************************************
12**
13** Simple multi-threaded server used for informal testing of concurrency
14** between connections in different threads. Listens for tcp/ip connections
15** on port 9999 of the 127.0.0.1 interface only. To build:
16**
17** gcc -g $(TOP)/tool/tserver.c sqlite3.o -lpthread -o tserver
18**
19** To run using "x.db" as the db file:
20**
21** ./tserver x.db
22**
23** To connect, open a client socket on port 9999 and start sending commands.
24** Commands are either SQL - which must be terminated by a semi-colon, or
25** dot-commands, which must be terminated by a newline. If an SQL statement
26** is seen, it is prepared and added to an internal list.
27**
28** Dot-commands are:
29**
30** .list Display all SQL statements in the list.
31** .quit Disconnect.
32** .run Run all SQL statements in the list.
33** .repeats N Configure the number of repeats per ".run".
34** .seconds N Configure the number of seconds to ".run" for.
35** .mutex_commit Add a "COMMIT" protected by a g.commit_mutex
36** to the current SQL.
37** .stop Stop the tserver process - exit(0).
38**
39** Example input:
40**
41** BEGIN;
42** INSERT INTO t1 VALUES(randomblob(10), randomblob(100));
43** INSERT INTO t1 VALUES(randomblob(10), randomblob(100));
44** INSERT INTO t1 VALUES(randomblob(10), randomblob(100));
45** COMMIT;
46** .repeats 100000
47** .run
48**
49*/
50#define TSERVER_PORTNUMBER 9999
51
52#include <arpa/inet.h>
53#include <assert.h>
54#include <pthread.h>
55#include <signal.h>
56#include <stdint.h>
57#include <stdio.h>
58#include <stdlib.h>
59#include <string.h>
60#include <sys/socket.h>
61#include <sys/time.h>
62#include <unistd.h>
63
64#include "sqlite3.h"
65
66#define TSERVER_DEFAULT_CHECKPOINT_THRESHOLD 3900
67
68/* Global variables */
69struct TserverGlobal {
70 char *zDatabaseName; /* Database used by this server */
71 char *zVfs;
72 sqlite3_mutex *commit_mutex;
73 sqlite3 *db; /* Global db handle */
74
75 /* The following use native pthreads instead of a portable interface. This
76 ** is because a condition variable, as well as a mutex, is required. */
77 pthread_mutex_t ckpt_mutex;
78 pthread_cond_t ckpt_cond;
79 int nThreshold; /* Checkpoint when wal is this large */
80 int bCkptRequired; /* True if wal checkpoint is required */
81 int nRun; /* Number of clients in ".run" */
82 int nWait; /* Number of clients waiting on ckpt_cond */
83};
84
85static struct TserverGlobal g = {0};
86
87typedef struct ClientSql ClientSql;
88struct ClientSql {
89 sqlite3_stmt *pStmt;
90 int bMutex;
91};
92
93typedef struct ClientCtx ClientCtx;
94struct ClientCtx {
95 sqlite3 *db; /* Database handle for this client */
96 int fd; /* Client fd */
97 int nRepeat; /* Number of times to repeat SQL */
98 int nSecond; /* Number of seconds to run for */
99 ClientSql *aPrepare; /* Array of prepared statements */
100 int nPrepare; /* Valid size of apPrepare[] */
101 int nAlloc; /* Allocated size of apPrepare[] */
102
103 int nClientThreshold; /* Threshold for checkpointing */
104 int bClientCkptRequired; /* True to do a checkpoint */
105};
106
107static int is_eol(int i){
108 return (i=='\n' || i=='\r');
109}
110static int is_whitespace(int i){
111 return (i==' ' || i=='\t' || is_eol(i));
112}
113
114/*
115** Implementation of SQL scalar function usleep().
116*/
117static void usleepFunc(
118 sqlite3_context *context,
119 int argc,
120 sqlite3_value **argv
121){
122 int nUs;
123 sqlite3_vfs *pVfs = (sqlite3_vfs*)sqlite3_user_data(context);
124 assert( argc==1 );
125 nUs = sqlite3_value_int64(argv[0]);
126 pVfs->xSleep(pVfs, nUs);
127}
128
129static void trim_string(const char **pzStr, int *pnStr){
130 const char *zStr = *pzStr;
131 int nStr = *pnStr;
132
133 while( nStr>0 && is_whitespace(zStr[0]) ){
134 zStr++;
135 nStr--;
136 }
137 while( nStr>0 && is_whitespace(zStr[nStr-1]) ){
138 nStr--;
139 }
140
141 *pzStr = zStr;
142 *pnStr = nStr;
143}
144
145static int send_message(ClientCtx *p, const char *zFmt, ...){
146 char *zMsg;
147 va_list ap; /* Vararg list */
148 va_start(ap, zFmt);
149 int res = -1;
150
151 zMsg = sqlite3_vmprintf(zFmt, ap);
152 if( zMsg ){
153 res = write(p->fd, zMsg, strlen(zMsg));
154 }
155 sqlite3_free(zMsg);
156 va_end(ap);
157
158 return (res<0);
159}
160
161static int handle_some_sql(ClientCtx *p, const char *zSql, int nSql){
162 const char *zTail = zSql;
163 int nTail = nSql;
164 int rc = SQLITE_OK;
165
166 while( rc==SQLITE_OK ){
167 if( p->nPrepare>=p->nAlloc ){
168 int nByte = (p->nPrepare+32) * sizeof(ClientSql);
169 ClientSql *aNew = sqlite3_realloc(p->aPrepare, nByte);
170 if( aNew ){
171 memset(&aNew[p->nPrepare], 0, sizeof(ClientSql)*32);
172 p->aPrepare = aNew;
173 p->nAlloc = p->nPrepare+32;
174 }else{
175 rc = SQLITE_NOMEM;
176 break;
177 }
178 }
179 rc = sqlite3_prepare_v2(
180 p->db, zTail, nTail, &p->aPrepare[p->nPrepare].pStmt, &zTail
181 );
182 if( rc!=SQLITE_OK ){
183 send_message(p, "error - %s (eec=%d)\n", sqlite3_errmsg(p->db),
184 sqlite3_extended_errcode(p->db)
185 );
186 rc = 1;
187 break;
188 }
189 if( p->aPrepare[p->nPrepare].pStmt==0 ){
190 break;
191 }
192 p->nPrepare++;
193 nTail = nSql - (zTail-zSql);
194 rc = send_message(p, "ok (%d SQL statements)\n", p->nPrepare);
195 }
196
197 return rc;
198}
199
200static sqlite3_int64 get_timer(void){
201 struct timeval t;
202 gettimeofday(&t, 0);
203 return ((sqlite3_int64)t.tv_usec / 1000) + ((sqlite3_int64)t.tv_sec * 1000);
204}
205
206static void clear_sql(ClientCtx *p){
207 int j;
208 for(j=0; j<p->nPrepare; j++){
209 sqlite3_finalize(p->aPrepare[j].pStmt);
210 }
211 p->nPrepare = 0;
212}
213
214/*
215** The sqlite3_wal_hook() callback used by all client database connections.
216*/
217static int clientWalHook(void *pArg, sqlite3 *db, const char *zDb, int nFrame){
218 if( g.nThreshold>0 ){
219 if( nFrame>=g.nThreshold ){
220 g.bCkptRequired = 1;
221 }
222 }else{
223 ClientCtx *pCtx = (ClientCtx*)pArg;
224 if( pCtx->nClientThreshold && nFrame>=pCtx->nClientThreshold ){
225 pCtx->bClientCkptRequired = 1;
226 }
227 }
228 return SQLITE_OK;
229}
230
231static int handle_run_command(ClientCtx *p){
232 int i, j;
233 int nBusy = 0;
234 sqlite3_int64 t0 = get_timer();
235 sqlite3_int64 t1 = t0;
236 int nT1 = 0;
237 int nTBusy1 = 0;
238 int rc = SQLITE_OK;
239
240 pthread_mutex_lock(&g.ckpt_mutex);
241 g.nRun++;
242 pthread_mutex_unlock(&g.ckpt_mutex);
243
244
245 for(j=0; (p->nRepeat<=0 || j<p->nRepeat) && rc==SQLITE_OK; j++){
246 sqlite3_int64 t2;
247
248 for(i=0; i<p->nPrepare && rc==SQLITE_OK; i++){
249 sqlite3_stmt *pStmt = p->aPrepare[i].pStmt;
250
251 /* If the bMutex flag is set, grab g.commit_mutex before executing
252 ** the SQL statement (which is always "COMMIT" in this case). */
253 if( p->aPrepare[i].bMutex ){
254 sqlite3_mutex_enter(g.commit_mutex);
255 }
256
257 /* Execute the statement */
258 while( sqlite3_step(pStmt)==SQLITE_ROW );
259 rc = sqlite3_reset(pStmt);
260
261 /* Relinquish the g.commit_mutex mutex if required. */
262 if( p->aPrepare[i].bMutex ){
263 sqlite3_mutex_leave(g.commit_mutex);
264 }
265
266 if( (rc & 0xFF)==SQLITE_BUSY ){
267 if( sqlite3_get_autocommit(p->db)==0 ){
268 sqlite3_exec(p->db, "ROLLBACK", 0, 0, 0);
269 }
270 nBusy++;
271 rc = SQLITE_OK;
272 break;
273 }
274 else if( rc!=SQLITE_OK ){
275 send_message(p, "error - %s (eec=%d)\n", sqlite3_errmsg(p->db),
276 sqlite3_extended_errcode(p->db)
277 );
278 }
279 }
280
281 t2 = get_timer();
282 if( t2>=(t1+1000) ){
283 int nMs = (t2 - t1);
284 int nDone = (j+1 - nBusy - nT1);
285
286 rc = send_message(
287 p, "(%d done @ %d per second, %d busy)\n",
288 nDone, (1000*nDone + nMs/2) / nMs, nBusy - nTBusy1
289 );
290 t1 = t2;
291 nT1 = j+1 - nBusy;
292 nTBusy1 = nBusy;
293 if( p->nSecond>0 && (p->nSecond*1000)<=t1-t0 ) break;
294 }
295
296 /* Global checkpoint handling. */
297 if( g.nThreshold>0 ){
298 pthread_mutex_lock(&g.ckpt_mutex);
299 if( rc==SQLITE_OK && g.bCkptRequired ){
300 if( g.nWait==g.nRun-1 ){
301 /* All other clients are already waiting on the condition variable.
302 ** Run the checkpoint, signal the condition and move on. */
303 rc = sqlite3_wal_checkpoint(p->db, "main");
304 g.bCkptRequired = 0;
305 pthread_cond_broadcast(&g.ckpt_cond);
306 }else{
307 assert( g.nWait<g.nRun-1 );
308 g.nWait++;
309 pthread_cond_wait(&g.ckpt_cond, &g.ckpt_mutex);
310 g.nWait--;
311 }
312 }
313 pthread_mutex_unlock(&g.ckpt_mutex);
314 }
315
316 if( rc==SQLITE_OK && p->bClientCkptRequired ){
317 rc = sqlite3_wal_checkpoint(p->db, "main");
318 assert( rc==SQLITE_OK );
319 p->bClientCkptRequired = 0;
320 }
321 }
322
323 if( rc==SQLITE_OK ){
324 int nMs = (int)(get_timer() - t0);
325 send_message(p, "ok (%d/%d SQLITE_BUSY)\n", nBusy, j);
326 if( p->nRepeat<=0 ){
327 send_message(p, "### ok %d busy %d ms %d\n", j-nBusy, nBusy, nMs);
328 }
329 }
330 clear_sql(p);
331
332 pthread_mutex_lock(&g.ckpt_mutex);
333 g.nRun--;
334 pthread_mutex_unlock(&g.ckpt_mutex);
335
336 return rc;
337}
338
339static int handle_dot_command(ClientCtx *p, const char *zCmd, int nCmd){
340 int n;
341 int rc = 0;
342 const char *z = &zCmd[1];
343 const char *zArg;
344 int nArg;
345
346 assert( zCmd[0]=='.' );
347 for(n=0; n<(nCmd-1); n++){
348 if( is_whitespace(z[n]) ) break;
349 }
350
351 zArg = &z[n];
352 nArg = nCmd-n;
353 trim_string(&zArg, &nArg);
354
355 if( n>=1 && n<=4 && 0==strncmp(z, "list", n) ){
356 int i;
357 for(i=0; rc==0 && i<p->nPrepare; i++){
358 const char *zSql = sqlite3_sql(p->aPrepare[i].pStmt);
359 int nSql = strlen(zSql);
360 trim_string(&zSql, &nSql);
361 rc = send_message(p, "%d: %.*s\n", i, nSql, zSql);
362 }
363 }
364
365 else if( n>=1 && n<=4 && 0==strncmp(z, "quit", n) ){
366 rc = 1;
367 }
368
369 else if( n>=2 && n<=7 && 0==strncmp(z, "repeats", n) ){
370 if( nArg ){
371 p->nRepeat = strtol(zArg, 0, 0);
372 if( p->nRepeat>0 ) p->nSecond = 0;
373 }
374 rc = send_message(p, "ok (repeat=%d)\n", p->nRepeat);
375 }
376
377 else if( n>=2 && n<=3 && 0==strncmp(z, "run", n) ){
378 rc = handle_run_command(p);
379 }
380
381 else if( n>=2 && n<=7 && 0==strncmp(z, "seconds", n) ){
382 if( nArg ){
383 p->nSecond = strtol(zArg, 0, 0);
384 if( p->nSecond>0 ) p->nRepeat = 0;
385 }
386 rc = send_message(p, "ok (seconds=%d)\n", p->nSecond);
387 }
388
389 else if( n>=1 && n<=12 && 0==strncmp(z, "mutex_commit", n) ){
390 rc = handle_some_sql(p, "COMMIT;", 7);
391 if( rc==SQLITE_OK ){
392 p->aPrepare[p->nPrepare-1].bMutex = 1;
393 }
394 }
395
396 else if( n>=1 && n<=10 && 0==strncmp(z, "checkpoint", n) ){
397 if( nArg ){
398 p->nClientThreshold = strtol(zArg, 0, 0);
399 }
400 rc = send_message(p, "ok (checkpoint=%d)\n", p->nClientThreshold);
401 }
402
403 else if( n>=2 && n<=4 && 0==strncmp(z, "stop", n) ){
404 sqlite3_close(g.db);
405 exit(0);
406 }
407
408 else{
409 send_message(p,
410 "unrecognized dot command: %.*s\n"
411 "should be \"list\", \"run\", \"repeats\", \"mutex_commit\", "
412 "\"checkpoint\" or \"seconds\"\n", n, z
413 );
414 rc = 1;
415 }
416
417 return rc;
418}
419
420static void *handle_client(void *pArg){
421 char zCmd[32*1024]; /* Read buffer */
422 int nCmd = 0; /* Valid bytes in zCmd[] */
423 int res; /* Result of read() call */
424 int rc = SQLITE_OK;
425
426 ClientCtx ctx;
427 memset(&ctx, 0, sizeof(ClientCtx));
428
429 ctx.fd = (int)(intptr_t)pArg;
430 ctx.nRepeat = 1;
431 rc = sqlite3_open_v2(g.zDatabaseName, &ctx.db,
432 SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE, g.zVfs
433 );
434 if( rc!=SQLITE_OK ){
435 fprintf(stderr, "sqlite3_open(): %s\n", sqlite3_errmsg(ctx.db));
436 return 0;
437 }
438 sqlite3_create_function(
439 ctx.db, "usleep", 1, SQLITE_UTF8, (void*)sqlite3_vfs_find(0),
440 usleepFunc, 0, 0
441 );
442
443 /* Register the wal-hook with the new client connection */
444 sqlite3_wal_hook(ctx.db, clientWalHook, (void*)&ctx);
445
446 while( rc==SQLITE_OK ){
447 int i;
448 int iStart;
449 int nConsume;
450 res = read(ctx.fd, &zCmd[nCmd], sizeof(zCmd)-nCmd-1);
451 if( res<=0 ) break;
452 nCmd += res;
453 if( nCmd>=sizeof(zCmd)-1 ){
454 fprintf(stderr, "oversized (>32KiB) message\n");
455 res = 0;
456 break;
457 }
458 zCmd[nCmd] = '\0';
459
460 do {
461 nConsume = 0;
462
463 /* Gobble up any whitespace */
464 iStart = 0;
465 while( is_whitespace(zCmd[iStart]) ) iStart++;
466
467 if( zCmd[iStart]=='.' ){
468 /* This is a dot-command. Search for end-of-line. */
469 for(i=iStart; i<nCmd; i++){
470 if( is_eol(zCmd[i]) ){
471 rc = handle_dot_command(&ctx, &zCmd[iStart], i-iStart);
472 nConsume = i+1;
473 break;
474 }
475 }
476 }else{
477
478 int iSemi;
479 char c = 0;
480 for(iSemi=iStart; iSemi<nCmd; iSemi++){
481 if( zCmd[iSemi]==';' ){
482 c = zCmd[iSemi+1];
483 zCmd[iSemi+1] = '\0';
484 break;
485 }
486 }
487
488 if( iSemi<nCmd ){
489 if( sqlite3_complete(zCmd) ){
490 rc = handle_some_sql(&ctx, zCmd, iSemi+1);
491 nConsume = iSemi+1;
492 }
493
494 if( c ){
495 zCmd[iSemi+1] = c;
496 }
497 }
498 }
499
500 if( nConsume>0 ){
501 nCmd = nCmd-nConsume;
502 if( nCmd>0 ){
503 memmove(zCmd, &zCmd[nConsume], nCmd);
504 }
505 }
506 }while( rc==SQLITE_OK && nConsume>0 );
507 }
508
509 fprintf(stdout, "Client %d disconnects\n", ctx.fd);
510 fflush(stdout);
511 close(ctx.fd);
512 clear_sql(&ctx);
513 sqlite3_free(ctx.aPrepare);
514 sqlite3_close(ctx.db);
515 return 0;
516}
517
518static void usage(const char *zExec){
519 fprintf(stderr, "Usage: %s ?-vfs VFS? DATABASE\n", zExec);
520 exit(1);
521}
522
523int main(int argc, char *argv[]) {
524 int sfd;
525 int rc;
526 int yes = 1;
527 struct sockaddr_in server;
528 int i;
529
530 /* Ignore SIGPIPE. Otherwise the server exits if a client disconnects
531 ** abruptly. */
532 signal(SIGPIPE, SIG_IGN);
533
534 g.nThreshold = TSERVER_DEFAULT_CHECKPOINT_THRESHOLD;
535 if( (argc%2) ) usage(argv[0]);
536 for(i=1; i<(argc-1); i+=2){
537 int n = strlen(argv[i]);
538 if( n>=2 && 0==sqlite3_strnicmp("-walautocheckpoint", argv[i], n) ){
539 g.nThreshold = strtol(argv[i+1], 0, 0);
540 }else
541 if( n>=2 && 0==sqlite3_strnicmp("-vfs", argv[i], n) ){
542 g.zVfs = argv[i+1];
543 }
544 }
545 g.zDatabaseName = argv[argc-1];
546
547 g.commit_mutex = sqlite3_mutex_alloc(SQLITE_MUTEX_FAST);
548 pthread_mutex_init(&g.ckpt_mutex, 0);
549 pthread_cond_init(&g.ckpt_cond, 0);
550
551 rc = sqlite3_open_v2(g.zDatabaseName, &g.db,
552 SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE, g.zVfs
553 );
554 if( rc!=SQLITE_OK ){
555 fprintf(stderr, "sqlite3_open(): %s\n", sqlite3_errmsg(g.db));
556 return 1;
557 }
558
559 rc = sqlite3_exec(g.db, "SELECT * FROM sqlite_master", 0, 0, 0);
560 if( rc!=SQLITE_OK ){
561 fprintf(stderr, "sqlite3_exec(): %s\n", sqlite3_errmsg(g.db));
562 return 1;
563 }
564
565 sfd = socket(AF_INET, SOCK_STREAM, 0);
566 if( sfd<0 ){
567 fprintf(stderr, "socket() failed\n");
568 return 1;
569 }
570
571 rc = setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
572 if( rc<0 ){
573 perror("setsockopt");
574 return 1;
575 }
576
577 memset(&server, 0, sizeof(server));
578 server.sin_family = AF_INET;
579 server.sin_addr.s_addr = inet_addr("127.0.0.1");
580 server.sin_port = htons(TSERVER_PORTNUMBER);
581
582 rc = bind(sfd, (struct sockaddr *)&server, sizeof(struct sockaddr));
583 if( rc<0 ){
584 fprintf(stderr, "bind() failed\n");
585 return 1;
586 }
587
588 rc = listen(sfd, 8);
589 if( rc<0 ){
590 fprintf(stderr, "listen() failed\n");
591 return 1;
592 }
593
594 while( 1 ){
595 pthread_t tid;
596 int cfd = accept(sfd, NULL, NULL);
597 if( cfd<0 ){
598 perror("accept()");
599 return 1;
600 }
601
602 fprintf(stdout, "Client %d connects\n", cfd);
603 fflush(stdout);
604 rc = pthread_create(&tid, NULL, handle_client, (void*)(intptr_t)cfd);
605 if( rc!=0 ){
606 perror("pthread_create()");
607 return 1;
608 }
609
610 pthread_detach(tid);
611 }
612
613 return 0;
614}