blob: e88a43d523746434e25fe67790980efdb15e021b [file] [log] [blame]
drh23669402006-01-09 17:29:52 +00001/*
2** 2005 December 14
3**
4** The author disclaims copyright to this source code. In place of
5** a legal notice, here is a blessing:
6**
7** May you do good and not evil.
8** May you find forgiveness for yourself and forgive others.
9** May you share freely, never taking more than you give.
10**
11*************************************************************************
12**
13** This file contains an example implementation of an asynchronous IO
drhfe0f75b2006-01-10 20:01:18 +000014** backend for SQLite.
15**
16** WHAT IS ASYNCHRONOUS I/O?
17**
18** With asynchronous I/O, write requests are handled by a separate thread
19** running in the background. This means that the thread that initiates
20** a database write does not have to wait for (sometimes slow) disk I/O
21** to occur. The write seems to happen very quickly, though in reality
22** it is happening at its usual slow pace in the background.
23**
24** Asynchronous I/O appears to give better responsiveness, but at a price.
25** You lose the Durable property. With the default I/O backend of SQLite,
26** once a write completes, you know that the information you wrote is
27** safely on disk. With the asynchronous I/O, this is no the case. If
28** your program crashes or if you take a power lose after the database
29** write but before the asynchronous write thread has completed, then the
30** database change might never make it to disk and the next user of the
31** database might not see your change.
32**
33** You lose Durability with asynchronous I/O, but you still retain the
34** other parts of ACID: Atomic, Consistent, and Isolated. Many
35** appliations get along fine without the Durablity.
36**
37** HOW IT WORKS
38**
39** Asynchronous I/O works by overloading the OS-layer disk I/O routines
40** with modified versions that store the data to be written in queue of
41** pending write operations. Look at the asyncEnable() subroutine to see
42** how overloading works. Six os-layer routines are overloaded:
43**
44** sqlite3OsOpenReadWrite;
45** sqlite3OsOpenReadOnly;
46** sqlite3OsOpenExclusive;
47** sqlite3OsDelete;
48** sqlite3OsFileExists;
49** sqlite3OsSyncDirectory;
50**
51** The original implementations of these routines are saved and are
52** used by the writer thread to do the real I/O. The substitute
53** implementations typically put the I/O operation on a queue
54** to be handled later by the writer thread, though read operations
55** must be handled right away, obviously.
56**
57** Asynchronous I/O is disabled by setting the os-layer interface routines
58** back to their original values.
59**
60** LIMITATIONS
61**
62** This demonstration code is deliberately kept simple in order to keep
63** the main ideas clear and easy to understand. Real applications that
64** want to do asynchronous I/O might want to add additional capabilities.
65** For example, in this demonstration if writes are happening at a steady
66** stream that exceeds the I/O capability of the background writer thread,
67** the queue of pending write operations will grow without bound until we
68** run out of memory. Users of this technique may want to keep track of
69** the quantity of pending writes and stop accepting new write requests
70** when the buffer gets to be too big.
drh23669402006-01-09 17:29:52 +000071*/
72
73#include "sqliteInt.h"
74#include "os.h"
75#include <tcl.h>
76
drhfe0f75b2006-01-10 20:01:18 +000077/* If the THREADSAFE macro is not set, assume that it is turned off. */
drh23669402006-01-09 17:29:52 +000078#ifndef THREADSAFE
79# define THREADSAFE 0
80#endif
81
82/*
83** This test uses pthreads and hence only works on unix and with
drhfe0f75b2006-01-10 20:01:18 +000084** a threadsafe build of SQLite. It also requires that the redefinable
85** I/O feature of SQLite be turned on. This feature is turned off by
86** default. If a required element is missing, almost all of the code
87** in this file is commented out.
drh23669402006-01-09 17:29:52 +000088*/
89#if OS_UNIX && THREADSAFE && defined(SQLITE_ENABLE_REDEF_IO)
90
drhfe0f75b2006-01-10 20:01:18 +000091/*
92** This demo uses pthreads. If you do not have a pthreads implementation
93** for your operating system, you will need to recode the threading
94** logic.
95*/
drh23669402006-01-09 17:29:52 +000096#include <pthread.h>
97#include <sched.h>
98
drhfe0f75b2006-01-10 20:01:18 +000099/* Useful macros used in several places */
drh23669402006-01-09 17:29:52 +0000100#define MIN(x,y) ((x)<(y)?(x):(y))
101#define MAX(x,y) ((x)>(y)?(x):(y))
102
drhfe0f75b2006-01-10 20:01:18 +0000103/* Forward references */
drh23669402006-01-09 17:29:52 +0000104typedef struct AsyncWrite AsyncWrite;
105typedef struct AsyncFile AsyncFile;
106
drhfe0f75b2006-01-10 20:01:18 +0000107/* Enable for debugging */
drh23669402006-01-09 17:29:52 +0000108#if 0
109# define TRACE(X,Y) \
110 fprintf(stderr,"THRD=%d: ", (int)pthread_self()); \
111 fprintf(stderr,X,Y);
112#else
113# define TRACE(X,Y) /* noop */
114#endif
115
116/*
drh23669402006-01-09 17:29:52 +0000117** THREAD SAFETY NOTES
118**
119** Basic rules:
120**
121** * Both read and write access to the global write-op queue must be
122** protected by the async.queueMutex.
123**
124** * The file handles from the underlying system are assumed not to
125** be thread safe.
126**
drhfe0f75b2006-01-10 20:01:18 +0000127** * See the last two paragraphs under "The Writer Thread" for
drh23669402006-01-09 17:29:52 +0000128** an assumption to do with file-handle synchronization by the Os.
129**
130** File system operations (invoked by SQLite thread):
131**
132** xOpenXXX (three versions)
133** xDelete
134** xFileExists
135** xSyncDirectory
136**
137** File handle operations (invoked by SQLite thread):
138**
drh23669402006-01-09 17:29:52 +0000139** asyncWrite, asyncClose, asyncTruncate, asyncSync,
140** asyncSetFullSync, asyncOpenDirectory.
141**
drhfe0f75b2006-01-10 20:01:18 +0000142** The operations above add an entry to the global write-op list. They
143** prepare the entry, acquire the async.queueMutex momentarily while
144** list pointers are manipulated to insert the new entry, then release
145** the mutex and signal the writer thread to wake up in case it happens
146** to be asleep.
147**
drh23669402006-01-09 17:29:52 +0000148**
149** asyncRead, asyncFileSize.
drhfe0f75b2006-01-10 20:01:18 +0000150**
151** Read operations. Both of these read from both the underlying file
152** first then adjust their result based on pending writes in the
153** write-op queue. So async.queueMutex is held for the duration
154** of these operations to prevent other threads from changing the
155** queue in mid operation.
156**
157**
158** asyncLock, asyncUnlock, asyncLockState, asyncCheckReservedLock
drh23669402006-01-09 17:29:52 +0000159**
160** These locking primitives become no-ops. Files are always opened for
drhfe0f75b2006-01-10 20:01:18 +0000161** exclusive access when using this IO backend.
162**
163**
164** asyncFileHandle.
drh23669402006-01-09 17:29:52 +0000165**
166** The sqlite3OsFileHandle() function is currently only used when
167** debugging the pager module. Unless sqlite3OsClose() is called on the
168** file (shouldn't be possible for other reasons), the underlying
169** implementations are safe to call without grabbing any mutex. So we just
drhfe0f75b2006-01-10 20:01:18 +0000170** go ahead and call it no matter what any other threads are doing.
drh23669402006-01-09 17:29:52 +0000171**
drhfe0f75b2006-01-10 20:01:18 +0000172**
173** asyncSeek.
drh23669402006-01-09 17:29:52 +0000174**
175** Calling this method just manipulates the AsyncFile.iOffset variable.
176** Since this variable is never accessed by writer thread, this
177** function does not require the mutex. Actual calls to OsSeek() take
178** place just before OsWrite() or OsRead(), which are always protected by
179** the mutex.
drh23669402006-01-09 17:29:52 +0000180**
181** The writer thread:
182**
183** The async.writerMutex is used to make sure only there is only
184** a single writer thread running at a time.
185**
186** Inside the writer thread is a loop that works like this:
187**
188** WHILE (write-op list is not empty)
189** Do IO operation at head of write-op list
190** Remove entry from head of write-op list
191** END WHILE
192**
193** The async.queueMutex is always held during the <write-op list is
194** not empty> test, and when the entry is removed from the head
195** of the write-op list. Sometimes it is held for the interim
drhfe0f75b2006-01-10 20:01:18 +0000196** period (while the IO is performed), and sometimes it is
drh23669402006-01-09 17:29:52 +0000197** relinquished. It is relinquished if (a) the IO op is an
198** ASYNC_CLOSE or (b) when the file handle was opened, two of
199** the underlying systems handles were opened on the same
200** file-system entry.
201**
202** If condition (b) above is true, then one file-handle
203** (AsyncFile.pBaseRead) is used exclusively by sqlite threads to read the
204** file, the other (AsyncFile.pBaseWrite) by sqlite3_async_flush()
205** threads to perform write() operations. This means that read
206** operations are not blocked by asynchronous writes (although
207** asynchronous writes may still be blocked by reads).
208**
209** This assumes that the OS keeps two handles open on the same file
210** properly in sync. That is, any read operation that starts after a
211** write operation on the same file system entry has completed returns
212** data consistent with the write. We also assume that if one thread
213** reads a file while another is writing it all bytes other than the
214** ones actually being written contain valid data.
215**
216** If the above assumptions are not true, set the preprocessor symbol
217** SQLITE_ASYNC_TWO_FILEHANDLES to 0.
218*/
219
220#ifndef SQLITE_ASYNC_TWO_FILEHANDLES
221/* #define SQLITE_ASYNC_TWO_FILEHANDLES 0 */
222#define SQLITE_ASYNC_TWO_FILEHANDLES 1
223#endif
224
225/*
226** State information is held in the static variable "async" defined
227** as follows:
228*/
229static struct TestAsyncStaticData {
230 pthread_mutex_t queueMutex; /* Mutex for access to write operation queue */
231 pthread_mutex_t writerMutex; /* Prevents multiple writer threads */
232 pthread_cond_t queueSignal; /* For waking up sleeping writer thread */
233 pthread_cond_t emptySignal; /* Notify when the write queue is empty */
234 AsyncWrite *pQueueFirst; /* Next write operation to be processed */
235 AsyncWrite *pQueueLast; /* Last write operation on the list */
236 volatile int ioDelay; /* Extra delay between write operations */
237 volatile int writerHaltWhenIdle; /* Writer thread halts when queue empty */
238 volatile int writerHaltNow; /* Writer thread halts after next op */
239} async = {
240 PTHREAD_MUTEX_INITIALIZER,
241 PTHREAD_MUTEX_INITIALIZER,
242 PTHREAD_COND_INITIALIZER,
243 PTHREAD_COND_INITIALIZER,
244};
245
246/* Possible values of AsyncWrite.op */
drh4b74b262006-02-13 13:50:55 +0000247#define ASYNC_NOOP 0
drh23669402006-01-09 17:29:52 +0000248#define ASYNC_WRITE 1
249#define ASYNC_SYNC 2
250#define ASYNC_TRUNCATE 3
251#define ASYNC_CLOSE 4
252#define ASYNC_OPENDIRECTORY 5
253#define ASYNC_SETFULLSYNC 6
254
255#define ASYNC_DELETE 7
256#define ASYNC_OPENEXCLUSIVE 8
257#define ASYNC_SYNCDIRECTORY 9
258
259/*
drhfe0f75b2006-01-10 20:01:18 +0000260** Entries on the write-op queue are instances of the AsyncWrite
261** structure, defined here.
262**
drh23669402006-01-09 17:29:52 +0000263** The interpretation of the iOffset and nByte variables varies depending
264** on the value of AsyncWrite.op:
265**
266** ASYNC_WRITE:
267** iOffset -> Offset in file to write to.
268** nByte -> Number of bytes of data to write (pointed to by zBuf).
269**
270** ASYNC_SYNC:
271** iOffset -> Unused.
272** nByte -> Value of "fullsync" flag to pass to sqlite3OsSync().
273**
274** ASYNC_TRUNCATE:
275** iOffset -> Size to truncate file to.
276** nByte -> Unused.
277**
278** ASYNC_CLOSE:
279** iOffset -> Unused.
280** nByte -> Unused.
281**
282** ASYNC_OPENDIRECTORY:
283** iOffset -> Unused.
284** nByte -> Number of bytes of zBuf points to (directory name).
285**
286** ASYNC_SETFULLSYNC:
287** iOffset -> Unused.
288** nByte -> New value for the full-sync flag.
289**
290**
291** ASYNC_DELETE:
292** iOffset -> Unused.
293** nByte -> Number of bytes of zBuf points to (file name).
294**
295** ASYNC_OPENEXCLUSIVE:
296** iOffset -> Value of "delflag".
297** nByte -> Number of bytes of zBuf points to (file name).
298**
299** For an ASYNC_WRITE operation, zBuf points to the data to write to the file.
300** This space is sqliteMalloc()d along with the AsyncWrite structure in a
301** single blob, so is deleted when sqliteFree() is called on the parent
302** structure.
303*/
304struct AsyncWrite {
305 AsyncFile *pFile; /* File to write data to or sync */
306 int op; /* One of ASYNC_xxx etc. */
307 i64 iOffset; /* See above */
308 int nByte; /* See above */
309 char *zBuf; /* Data to write to file (or NULL if op!=ASYNC_WRITE) */
310 AsyncWrite *pNext; /* Next write operation (to any file) */
311};
312
313/*
314** The AsyncFile structure is a subclass of OsFile used for asynchronous IO.
315*/
316struct AsyncFile {
317 IoMethod *pMethod; /* Must be first */
drh4b74b262006-02-13 13:50:55 +0000318 int ioError; /* Value of any asychronous error we have seen */ i64 iOffset; /* Current seek() offset in file */
drh23669402006-01-09 17:29:52 +0000319 OsFile *pBaseRead; /* Read handle to the underlying Os file */
320 OsFile *pBaseWrite; /* Write handle to the underlying Os file */
321};
322
323/*
324** Add an entry to the end of the global write-op list. pWrite should point
drhfe0f75b2006-01-10 20:01:18 +0000325** to an AsyncWrite structure allocated using sqlite3OsMalloc(). The writer
326** thread will call sqlite3OsFree() to free the structure after the specified
327** operation has been completed.
drh23669402006-01-09 17:29:52 +0000328**
drhfe0f75b2006-01-10 20:01:18 +0000329** Once an AsyncWrite structure has been added to the list, it becomes the
330** property of the writer thread and must not be read or modified by the
331** caller.
drh23669402006-01-09 17:29:52 +0000332*/
333static void addAsyncWrite(AsyncWrite *pWrite){
drhfe0f75b2006-01-10 20:01:18 +0000334 /* We must hold the queue mutex in order to modify the queue pointers */
drh23669402006-01-09 17:29:52 +0000335 pthread_mutex_lock(&async.queueMutex);
drhfe0f75b2006-01-10 20:01:18 +0000336
337 /* Add the record to the end of the write-op queue */
drh23669402006-01-09 17:29:52 +0000338 assert( !pWrite->pNext );
339 if( async.pQueueLast ){
340 assert( async.pQueueFirst );
341 async.pQueueLast->pNext = pWrite;
342 }else{
343 async.pQueueFirst = pWrite;
344 }
345 async.pQueueLast = pWrite;
346 TRACE("PUSH %p\n", pWrite);
drhfe0f75b2006-01-10 20:01:18 +0000347
348 /* Drop the queue mutex */
drh23669402006-01-09 17:29:52 +0000349 pthread_mutex_unlock(&async.queueMutex);
drhfe0f75b2006-01-10 20:01:18 +0000350
351 /* The writer thread might have been idle because there was nothing
352 ** on the write-op queue for it to do. So wake it up. */
drh23669402006-01-09 17:29:52 +0000353 pthread_cond_signal(&async.queueSignal);
354}
355
356/*
357** This is a utility function to allocate and populate a new AsyncWrite
358** structure and insert it (via addAsyncWrite() ) into the global list.
359*/
360static int addNewAsyncWrite(
361 AsyncFile *pFile,
362 int op,
363 i64 iOffset,
364 int nByte,
365 const char *zByte
366){
drh4b74b262006-02-13 13:50:55 +0000367 AsyncWrite *p;
368 if( pFile && pFile->ioError!=SQLITE_OK ){
369 return pFile->ioError;
370 }
371 p = sqlite3OsMalloc(sizeof(AsyncWrite) + (zByte?nByte:0));
drh23669402006-01-09 17:29:52 +0000372 if( !p ){
373 return SQLITE_NOMEM;
374 }
375 p->op = op;
376 p->iOffset = iOffset;
377 p->nByte = nByte;
378 p->pFile = pFile;
379 p->pNext = 0;
380 if( zByte ){
381 p->zBuf = (char *)&p[1];
382 memcpy(p->zBuf, zByte, nByte);
383 }else{
384 p->zBuf = 0;
385 }
386 addAsyncWrite(p);
387 return SQLITE_OK;
388}
389
390/*
391** Close the file. This just adds an entry to the write-op list, the file is
392** not actually closed.
393*/
394static int asyncClose(OsFile **pId){
395 return addNewAsyncWrite((AsyncFile *)*pId, ASYNC_CLOSE, 0, 0, 0);
396}
397
398/*
399** Implementation of sqlite3OsWrite() for asynchronous files. Instead of
400** writing to the underlying file, this function adds an entry to the end of
401** the global AsyncWrite list. Either SQLITE_OK or SQLITE_NOMEM may be
402** returned.
403*/
404static int asyncWrite(OsFile *id, const void *pBuf, int amt){
405 AsyncFile *pFile = (AsyncFile *)id;
406 int rc = addNewAsyncWrite(pFile, ASYNC_WRITE, pFile->iOffset, amt, pBuf);
407 pFile->iOffset += (i64)amt;
408 return rc;
409}
410
411/*
412** Truncate the file to nByte bytes in length. This just adds an entry to
413** the write-op list, no IO actually takes place.
414*/
415static int asyncTruncate(OsFile *id, i64 nByte){
416 return addNewAsyncWrite((AsyncFile *)id, ASYNC_TRUNCATE, nByte, 0, 0);
417}
418
419/*
420** Open the directory identified by zName and associate it with the
421** specified file. This just adds an entry to the write-op list, the
422** directory is opened later by sqlite3_async_flush().
423*/
424static int asyncOpenDirectory(OsFile *id, const char *zName){
425 AsyncFile *pFile = (AsyncFile *)id;
426 return addNewAsyncWrite(pFile, ASYNC_OPENDIRECTORY, 0, strlen(zName)+1,zName);
427}
428
429/*
430** Sync the file. This just adds an entry to the write-op list, the
431** sync() is done later by sqlite3_async_flush().
432*/
433static int asyncSync(OsFile *id, int fullsync){
434 return addNewAsyncWrite((AsyncFile *)id, ASYNC_SYNC, 0, fullsync, 0);
435}
436
437/*
438** Set (or clear) the full-sync flag on the underlying file. This operation
439** is queued and performed later by sqlite3_async_flush().
440*/
441static void asyncSetFullSync(OsFile *id, int value){
442 addNewAsyncWrite((AsyncFile *)id, ASYNC_SETFULLSYNC, 0, value, 0);
443}
444
445/*
446** Read data from the file. First we read from the filesystem, then adjust
447** the contents of the buffer based on ASYNC_WRITE operations in the
drhfe0f75b2006-01-10 20:01:18 +0000448** write-op queue.
drh23669402006-01-09 17:29:52 +0000449**
450** This method holds the mutex from start to finish.
451*/
452static int asyncRead(OsFile *id, void *obuf, int amt){
453 int rc = SQLITE_OK;
454 i64 filesize;
455 int nRead;
456 AsyncFile *pFile = (AsyncFile *)id;
457
drh4b74b262006-02-13 13:50:55 +0000458 /* If an I/O error has previously occurred on this file, then all
459 ** subsequent operations fail.
460 */
461 if( pFile->ioError!=SQLITE_OK ){
462 return pFile->ioError;
463 }
464
drh23669402006-01-09 17:29:52 +0000465 /* Grab the write queue mutex for the duration of the call */
466 pthread_mutex_lock(&async.queueMutex);
467
468 if( pFile->pBaseRead ){
469 rc = sqlite3OsFileSize(pFile->pBaseRead, &filesize);
470 if( rc!=SQLITE_OK ){
471 goto asyncread_out;
472 }
473 rc = sqlite3OsSeek(pFile->pBaseRead, pFile->iOffset);
474 if( rc!=SQLITE_OK ){
475 goto asyncread_out;
476 }
477 nRead = MIN(filesize - pFile->iOffset, amt);
478 if( nRead>0 ){
479 rc = sqlite3OsRead(((AsyncFile *)id)->pBaseRead, obuf, nRead);
480 }
481 }
482
483 if( rc==SQLITE_OK ){
484 AsyncWrite *p;
485 i64 iOffset = pFile->iOffset; /* Current seek offset */
486
487 for(p=async.pQueueFirst; p; p = p->pNext){
488 if( p->pFile==pFile && p->op==ASYNC_WRITE ){
drh44528382006-02-13 13:30:19 +0000489 int iBeginOut = (p->iOffset - iOffset);
490 int iBeginIn = -iBeginOut;
drh23669402006-01-09 17:29:52 +0000491 int nCopy;
492
493 if( iBeginIn<0 ) iBeginIn = 0;
494 if( iBeginOut<0 ) iBeginOut = 0;
495 nCopy = MIN(p->nByte-iBeginIn, amt-iBeginOut);
496
497 if( nCopy>0 ){
498 memcpy(&((char *)obuf)[iBeginOut], &p->zBuf[iBeginIn], nCopy);
499 }
500 }
501 }
502
503 pFile->iOffset += (i64)amt;
504 }
505
506asyncread_out:
507 pthread_mutex_unlock(&async.queueMutex);
508 return rc;
509}
510
511/*
512** Seek to the specified offset. This just adjusts the AsyncFile.iOffset
513** variable - calling seek() on the underlying file is defered until the
514** next read() or write() operation.
515*/
516static int asyncSeek(OsFile *id, i64 offset){
517 AsyncFile *pFile = (AsyncFile *)id;
518 pFile->iOffset = offset;
519 return SQLITE_OK;
520}
521
522/*
523** Read the size of the file. First we read the size of the file system
524** entry, then adjust for any ASYNC_WRITE or ASYNC_TRUNCATE operations
525** currently in the write-op list.
526**
527** This method holds the mutex from start to finish.
528*/
529int asyncFileSize(OsFile *id, i64 *pSize){
530 int rc = SQLITE_OK;
531 i64 s = 0;
532 OsFile *pBase;
533
534 pthread_mutex_lock(&async.queueMutex);
535
536 /* Read the filesystem size from the base file. If pBaseRead is NULL, this
537 ** means the file hasn't been opened yet. In this case all relevant data
538 ** must be in the write-op queue anyway, so we can omit reading from the
539 ** file-system.
540 */
541 pBase = ((AsyncFile *)id)->pBaseRead;
542 if( pBase ){
543 rc = sqlite3OsFileSize(pBase, &s);
544 }
545
546 if( rc==SQLITE_OK ){
547 AsyncWrite *p;
548 for(p=async.pQueueFirst; p; p = p->pNext){
549 if( p->pFile==(AsyncFile *)id ){
550 switch( p->op ){
551 case ASYNC_WRITE:
552 s = MAX(p->iOffset + (i64)(p->nByte), s);
553 break;
554 case ASYNC_TRUNCATE:
555 s = MIN(s, p->nByte);
556 break;
557 }
558 }
559 }
560 *pSize = s;
561 }
562 pthread_mutex_unlock(&async.queueMutex);
563 return rc;
564}
565
566/*
567** Return the operating system file handle. This is only used for debugging
568** at the moment anyway.
569*/
570static int asyncFileHandle(OsFile *id){
571 return sqlite3OsFileHandle(((AsyncFile *)id)->pBaseRead);
572}
573
drhfe0f75b2006-01-10 20:01:18 +0000574/*
575** No file locking occurs with this version of the asynchronous backend.
576** So the locking routines are no-ops.
577*/
drh23669402006-01-09 17:29:52 +0000578static int asyncLock(OsFile *id, int lockType){
579 return SQLITE_OK;
580}
581static int asyncUnlock(OsFile *id, int lockType){
582 return SQLITE_OK;
583}
584
585/*
586** This function is called when the pager layer first opens a database file
587** and is checking for a hot-journal.
588*/
589static int asyncCheckReservedLock(OsFile *id){
590 return SQLITE_OK;
591}
592
593/*
594** This is broken. But sqlite3OsLockState() is only used for testing anyway.
595*/
596static int asyncLockState(OsFile *id){
597 return SQLITE_OK;
598}
599
600/*
601** The following variables hold pointers to the original versions of
drhfe0f75b2006-01-10 20:01:18 +0000602** OS-layer interface routines that are overloaded in order to create
603** the asynchronous I/O backend.
drh23669402006-01-09 17:29:52 +0000604*/
605static int (*xOrigOpenReadWrite)(const char*, OsFile**, int*) = 0;
606static int (*xOrigOpenExclusive)(const char*, OsFile**, int) = 0;
607static int (*xOrigOpenReadOnly)(const char*, OsFile**) = 0;
608static int (*xOrigDelete)(const char*) = 0;
609static int (*xOrigFileExists)(const char*) = 0;
610static int (*xOrigSyncDirectory)(const char*) = 0;
611
drhfe0f75b2006-01-10 20:01:18 +0000612/*
613** This routine does most of the work of opening a file and building
614** the OsFile structure.
615*/
drh23669402006-01-09 17:29:52 +0000616static int asyncOpenFile(
drhfe0f75b2006-01-10 20:01:18 +0000617 const char *zName, /* The name of the file to be opened */
618 OsFile **pFile, /* Put the OsFile structure here */
619 OsFile *pBaseRead, /* The real OsFile from the real I/O routine */
620 int openForWriting /* Open a second file handle for writing if true */
drh23669402006-01-09 17:29:52 +0000621){
622 int rc;
623 AsyncFile *p;
624 OsFile *pBaseWrite = 0;
625
626 static IoMethod iomethod = {
627 asyncClose,
628 asyncOpenDirectory,
629 asyncRead,
630 asyncWrite,
631 asyncSeek,
632 asyncTruncate,
633 asyncSync,
634 asyncSetFullSync,
635 asyncFileHandle,
636 asyncFileSize,
637 asyncLock,
638 asyncUnlock,
639 asyncLockState,
640 asyncCheckReservedLock
641 };
642
drhfe0f75b2006-01-10 20:01:18 +0000643 if( openForWriting && SQLITE_ASYNC_TWO_FILEHANDLES ){
drh23669402006-01-09 17:29:52 +0000644 int dummy;
645 rc = xOrigOpenReadWrite(zName, &pBaseWrite, &dummy);
646 if( rc!=SQLITE_OK ){
647 goto error_out;
648 }
649 }
650
651 p = (AsyncFile *)sqlite3OsMalloc(sizeof(AsyncFile));
652 if( !p ){
653 rc = SQLITE_NOMEM;
654 goto error_out;
655 }
656 memset(p, 0, sizeof(AsyncFile));
657
658 p->pMethod = &iomethod;
659 p->pBaseRead = pBaseRead;
660 p->pBaseWrite = pBaseWrite;
drh4b74b262006-02-13 13:50:55 +0000661 p->ioError = SQLITE_OK;
drh23669402006-01-09 17:29:52 +0000662
663 *pFile = (OsFile *)p;
664 return SQLITE_OK;
665
666error_out:
667 assert(!p);
668 sqlite3OsClose(&pBaseRead);
669 sqlite3OsClose(&pBaseWrite);
670 *pFile = 0;
671 return rc;
672}
673
674/*
675** The async-IO backends implementation of the three functions used to open
676** a file (xOpenExclusive, xOpenReadWrite and xOpenReadOnly). Most of the
677** work is done in function asyncOpenFile() - see above.
678*/
679static int asyncOpenExclusive(const char *z, OsFile **ppFile, int delFlag){
680 int rc = asyncOpenFile(z, ppFile, 0, 0);
681 if( rc==SQLITE_OK ){
682 AsyncFile *pFile = (AsyncFile *)(*ppFile);
683 int nByte = strlen(z)+1;
684 i64 i = (i64)(delFlag);
685 rc = addNewAsyncWrite(pFile, ASYNC_OPENEXCLUSIVE, i, nByte, z);
686 if( rc!=SQLITE_OK ){
687 sqlite3OsFree(pFile);
688 *ppFile = 0;
689 }
690 }
691 return rc;
692}
693static int asyncOpenReadOnly(const char *z, OsFile **ppFile){
694 OsFile *pBase = 0;
695 int rc = xOrigOpenReadOnly(z, &pBase);
696 if( rc==SQLITE_OK ){
697 rc = asyncOpenFile(z, ppFile, pBase, 0);
698 }
699 return rc;
700}
701static int asyncOpenReadWrite(const char *z, OsFile **ppFile, int *pReadOnly){
702 OsFile *pBase = 0;
703 int rc = xOrigOpenReadWrite(z, &pBase, pReadOnly);
704 if( rc==SQLITE_OK ){
705 rc = asyncOpenFile(z, ppFile, pBase, (*pReadOnly ? 0 : 1));
706 }
707 return rc;
708}
709
710/*
711** Implementation of sqlite3OsDelete. Add an entry to the end of the
712** write-op queue to perform the delete.
713*/
714static int asyncDelete(const char *z){
715 return addNewAsyncWrite(0, ASYNC_DELETE, 0, strlen(z)+1, z);
716}
717
718/*
719** Implementation of sqlite3OsSyncDirectory. Add an entry to the end of the
720** write-op queue to perform the directory sync.
721*/
722static int asyncSyncDirectory(const char *z){
723 return addNewAsyncWrite(0, ASYNC_SYNCDIRECTORY, 0, strlen(z)+1, z);
724}
725
726/*
727** Implementation of sqlite3OsFileExists. Return true if file 'z' exists
728** in the file system.
729**
730** This method holds the mutex from start to finish.
731*/
732static int asyncFileExists(const char *z){
733 int ret;
734 AsyncWrite *p;
735
736 pthread_mutex_lock(&async.queueMutex);
737
738 /* See if the real file system contains the specified file. */
739 ret = xOrigFileExists(z);
740
741 for(p=async.pQueueFirst; p; p = p->pNext){
742 if( p->op==ASYNC_DELETE && 0==strcmp(p->zBuf, z) ){
743 ret = 0;
744 }else if( p->op==ASYNC_OPENEXCLUSIVE && 0==strcmp(p->zBuf, z) ){
745 ret = 1;
746 }
747 }
748
749 pthread_mutex_unlock(&async.queueMutex);
750 return ret;
751}
752
753/*
754** Call this routine to enable or disable the
755** asynchronous IO features implemented in this file.
756**
757** This routine is not even remotely threadsafe. Do not call
758** this routine while any SQLite database connections are open.
759*/
760static void asyncEnable(int enable){
761 if( enable && xOrigOpenReadWrite==0 ){
762 xOrigOpenReadWrite = sqlite3Os.xOpenReadWrite;
763 xOrigOpenReadOnly = sqlite3Os.xOpenReadOnly;
764 xOrigOpenExclusive = sqlite3Os.xOpenExclusive;
765 xOrigDelete = sqlite3Os.xDelete;
766 xOrigFileExists = sqlite3Os.xFileExists;
767 xOrigSyncDirectory = sqlite3Os.xSyncDirectory;
768
769 sqlite3Os.xOpenReadWrite = asyncOpenReadWrite;
770 sqlite3Os.xOpenReadOnly = asyncOpenReadOnly;
771 sqlite3Os.xOpenExclusive = asyncOpenExclusive;
772 sqlite3Os.xDelete = asyncDelete;
773 sqlite3Os.xFileExists = asyncFileExists;
774 sqlite3Os.xSyncDirectory = asyncSyncDirectory;
775 }
776 if( !enable && xOrigOpenReadWrite!=0 ){
777 sqlite3Os.xOpenReadWrite = xOrigOpenReadWrite;
778 sqlite3Os.xOpenReadOnly = xOrigOpenReadOnly;
779 sqlite3Os.xOpenExclusive = xOrigOpenExclusive;
780 sqlite3Os.xDelete = xOrigDelete;
781 sqlite3Os.xFileExists = xOrigFileExists;
782 sqlite3Os.xSyncDirectory = xOrigSyncDirectory;
783
784 xOrigOpenReadWrite = 0;
785 xOrigOpenReadOnly = 0;
786 xOrigOpenExclusive = 0;
787 xOrigDelete = 0;
788 xOrigFileExists = 0;
789 xOrigSyncDirectory = 0;
790 }
791}
792
793/*
794** This procedure runs in a separate thread, reading messages off of the
795** write queue and processing them one by one.
796**
797** If async.writerHaltNow is true, then this procedure exits
798** after processing a single message.
799**
800** If async.writerHaltWhenIdle is true, then this procedure exits when
801** the write queue is empty.
802**
803** If both of the above variables are false, this procedure runs
804** indefinately, waiting for operations to be added to the write queue
805** and processing them in the order in which they arrive.
806**
807** An artifical delay of async.ioDelay milliseconds is inserted before
808** each write operation in order to simulate the effect of a slow disk.
809**
810** Only one instance of this procedure may be running at a time.
811*/
812static void *asyncWriterThread(void *NotUsed){
813 AsyncWrite *p = 0;
814 int rc = SQLITE_OK;
815
816 if( pthread_mutex_trylock(&async.writerMutex) ){
817 return 0;
818 }
819 while( async.writerHaltNow==0 ){
820 int holdingMutex;
821 OsFile *pBase = 0;
822
823 pthread_mutex_lock(&async.queueMutex);
824 holdingMutex = 1;
825 while( (p = async.pQueueFirst)==0 ){
826 pthread_cond_broadcast(&async.emptySignal);
827 if( async.writerHaltWhenIdle ){
828 pthread_mutex_unlock(&async.queueMutex);
829 break;
830 }else{
831 TRACE("IDLE\n", 0);
832 pthread_cond_wait(&async.queueSignal, &async.queueMutex);
833 TRACE("WAKEUP\n", 0);
834 }
835 }
836 if( p==0 ) break;
837 TRACE("PROCESSING %p\n", p);
838
839 /* Right now this thread is holding the mutex on the write-op queue.
840 ** Variable 'p' points to the first entry in the write-op queue. In
841 ** the general case, we hold on to the mutex for the entire body of
842 ** the loop.
843 **
844 ** However in the cases enumerated below, we relinquish the mutex,
845 ** perform the IO, and then re-request the mutex before removing 'p' from
846 ** the head of the write-op queue. The idea is to increase concurrency with
847 ** sqlite threads.
848 **
849 ** * An ASYNC_CLOSE operation.
850 ** * An ASYNC_OPENEXCLUSIVE operation. For this one, we relinquish
851 ** the mutex, call the underlying xOpenExclusive() function, then
852 ** re-aquire the mutex before seting the AsyncFile.pBaseRead
853 ** variable.
854 ** * ASYNC_SYNC and ASYNC_WRITE operations, if
855 ** SQLITE_ASYNC_TWO_FILEHANDLES was set at compile time and two
856 ** file-handles are open for the particular file being "synced".
857 */
858 if( p->pFile ){
859 pBase = p->pFile->pBaseWrite;
drh4b74b262006-02-13 13:50:55 +0000860 if( p->pFile->ioError!=SQLITE_OK && p->op!=ASYNC_CLOSE ){
861 p->op = ASYNC_NOOP;
862 }
drh23669402006-01-09 17:29:52 +0000863 if(
864 p->op==ASYNC_CLOSE ||
865 p->op==ASYNC_OPENEXCLUSIVE ||
866 (pBase && (p->op==ASYNC_SYNC || p->op==ASYNC_WRITE) )
867 ){
868 pthread_mutex_unlock(&async.queueMutex);
869 holdingMutex = 0;
870 }
871 if( !pBase ){
872 pBase = p->pFile->pBaseRead;
873 }
874 }
875
876 switch( p->op ){
drh4b74b262006-02-13 13:50:55 +0000877 case ASYNC_NOOP:
878 break;
879
drh23669402006-01-09 17:29:52 +0000880 case ASYNC_WRITE:
881 assert( pBase );
882 rc = sqlite3OsSeek(pBase, p->iOffset);
883 if( rc==SQLITE_OK ){
884 rc = sqlite3OsWrite(pBase, (const void *)(p->zBuf), p->nByte);
885 }
886 break;
887
888 case ASYNC_SYNC:
889 assert( pBase );
890 rc = sqlite3OsSync(pBase, p->nByte);
891 break;
892
893 case ASYNC_TRUNCATE:
894 assert( pBase );
895 rc = sqlite3OsTruncate(pBase, p->nByte);
896 break;
897
898 case ASYNC_CLOSE:
899 sqlite3OsClose(&p->pFile->pBaseRead);
900 sqlite3OsClose(&p->pFile->pBaseWrite);
901 sqlite3OsFree(p->pFile);
902 break;
903
904 case ASYNC_OPENDIRECTORY:
905 assert( pBase );
906 sqlite3OsOpenDirectory(pBase, p->zBuf);
907 break;
908
909 case ASYNC_SETFULLSYNC:
910 assert( pBase );
911 sqlite3OsSetFullSync(pBase, p->nByte);
912 break;
913
914 case ASYNC_DELETE:
915 rc = xOrigDelete(p->zBuf);
916 break;
917
918 case ASYNC_SYNCDIRECTORY:
919 rc = xOrigSyncDirectory(p->zBuf);
920 break;
921
922 case ASYNC_OPENEXCLUSIVE: {
923 AsyncFile *pFile = p->pFile;
924 int delFlag = ((p->iOffset)?1:0);
925 OsFile *pBase = 0;
926 rc = xOrigOpenExclusive(p->zBuf, &pBase, delFlag);
927 assert( holdingMutex==0 );
928 pthread_mutex_lock(&async.queueMutex);
929 holdingMutex = 1;
930 if( rc==SQLITE_OK ){
931 pFile->pBaseRead = pBase;
932 }
933 break;
934 }
935
936 default: assert(!"Illegal value for AsyncWrite.op");
937 }
938
drh4b74b262006-02-13 13:50:55 +0000939 /* If an error happens, store the error code in the pFile.ioError
940 ** field. This will prevent any future operations on that file,
941 ** other than closing it.
942 **
943 ** We cannot report the error back to the connection that requested
944 ** the I/O since the error happened asynchronously. The connection has
945 ** already moved on. There really is nobody to report the error to.
946 */
947 if( rc!=SQLITE_OK ){
948 p->pFile->ioError = rc;
949 rc = SQLITE_OK;
950 }
951
drh23669402006-01-09 17:29:52 +0000952 /* If we didn't hang on to the mutex during the IO op, obtain it now
953 ** so that the AsyncWrite structure can be safely removed from the
954 ** global write-op queue.
955 */
956 if( !holdingMutex ){
957 pthread_mutex_lock(&async.queueMutex);
958 holdingMutex = 1;
959 }
960 TRACE("UNLINK %p\n", p);
drh4b74b262006-02-13 13:50:55 +0000961 if( p==async.pQueueLast ){
962 async.pQueueLast = 0;
drh23669402006-01-09 17:29:52 +0000963 }
drh4b74b262006-02-13 13:50:55 +0000964 async.pQueueFirst = p->pNext;
drh5c323542006-02-13 13:23:57 +0000965 sqlite3OsFree(p);
drh23669402006-01-09 17:29:52 +0000966 assert( holdingMutex );
967
968 /* Drop the queue mutex before continuing to the next write operation
969 ** in order to give other threads a chance to work with the write queue.
970 */
971 pthread_mutex_unlock(&async.queueMutex);
972 if( async.ioDelay>0 ){
973 sqlite3OsSleep(async.ioDelay);
974 }else{
975 sched_yield();
976 }
977 }
978 pthread_mutex_unlock(&async.writerMutex);
979 return 0;
980}
981
982/**************************************************************************
983** The remaining code defines a Tcl interface for testing the asynchronous
984** IO implementation in this file.
985**
986** To adapt the code to a non-TCL environment, delete or comment out
987** the code that follows.
988*/
989
990/*
991** sqlite3async_enable ?YES/NO?
992**
993** Enable or disable the asynchronous I/O backend. This command is
994** not thread-safe. Do not call it while any database connections
995** are open.
996*/
997static int testAsyncEnable(
998 void * clientData,
999 Tcl_Interp *interp,
1000 int objc,
1001 Tcl_Obj *CONST objv[]
1002){
1003 if( objc!=1 && objc!=2 ){
1004 Tcl_WrongNumArgs(interp, 1, objv, "?YES/NO?");
1005 return TCL_ERROR;
1006 }
1007 if( objc==1 ){
1008 Tcl_SetObjResult(interp, Tcl_NewBooleanObj(xOrigOpenReadWrite!=0));
1009 }else{
1010 int en;
1011 if( Tcl_GetBooleanFromObj(interp, objv[1], &en) ) return TCL_ERROR;
1012 asyncEnable(en);
1013 }
1014 return TCL_OK;
1015}
1016
1017/*
1018** sqlite3async_halt "now"|"idle"|"never"
1019**
1020** Set the conditions at which the writer thread will halt.
1021*/
1022static int testAsyncHalt(
1023 void * clientData,
1024 Tcl_Interp *interp,
1025 int objc,
1026 Tcl_Obj *CONST objv[]
1027){
1028 const char *zCond;
1029 if( objc!=2 ){
1030 Tcl_WrongNumArgs(interp, 1, objv, "\"now\"|\"idle\"|\"never\"");
1031 return TCL_ERROR;
1032 }
1033 zCond = Tcl_GetString(objv[1]);
1034 if( strcmp(zCond, "now")==0 ){
1035 async.writerHaltNow = 1;
1036 pthread_cond_broadcast(&async.queueSignal);
1037 }else if( strcmp(zCond, "idle")==0 ){
1038 async.writerHaltWhenIdle = 1;
1039 async.writerHaltNow = 0;
1040 pthread_cond_broadcast(&async.queueSignal);
1041 }else if( strcmp(zCond, "never")==0 ){
1042 async.writerHaltWhenIdle = 0;
1043 async.writerHaltNow = 0;
1044 }else{
1045 Tcl_AppendResult(interp,
1046 "should be one of: \"now\", \"idle\", or \"never\"", (char*)0);
1047 return TCL_ERROR;
1048 }
1049 return TCL_OK;
1050}
1051
1052/*
1053** sqlite3async_delay ?MS?
1054**
1055** Query or set the number of milliseconds of delay in the writer
1056** thread after each write operation. The default is 0. By increasing
1057** the memory delay we can simulate the effect of slow disk I/O.
1058*/
1059static int testAsyncDelay(
1060 void * clientData,
1061 Tcl_Interp *interp,
1062 int objc,
1063 Tcl_Obj *CONST objv[]
1064){
1065 if( objc!=1 && objc!=2 ){
1066 Tcl_WrongNumArgs(interp, 1, objv, "?MS?");
1067 return TCL_ERROR;
1068 }
1069 if( objc==1 ){
1070 Tcl_SetObjResult(interp, Tcl_NewIntObj(async.ioDelay));
1071 }else{
1072 int ioDelay;
1073 if( Tcl_GetIntFromObj(interp, objv[1], &ioDelay) ) return TCL_ERROR;
1074 async.ioDelay = ioDelay;
1075 }
1076 return TCL_OK;
1077}
1078
1079/*
1080** sqlite3async_start
1081**
1082** Start a new writer thread.
1083*/
1084static int testAsyncStart(
1085 void * clientData,
1086 Tcl_Interp *interp,
1087 int objc,
1088 Tcl_Obj *CONST objv[]
1089){
1090 pthread_t x;
1091 int rc;
1092 rc = pthread_create(&x, 0, asyncWriterThread, 0);
1093 if( rc ){
1094 Tcl_AppendResult(interp, "failed to create the thread", 0);
1095 return TCL_ERROR;
1096 }
1097 pthread_detach(x);
1098 return TCL_OK;
1099}
1100
1101/*
1102** sqlite3async_wait
1103**
1104** Wait for the current writer thread to terminate.
1105**
1106** If the current writer thread is set to run forever then this
1107** command would block forever. To prevent that, an error is returned.
1108*/
1109static int testAsyncWait(
1110 void * clientData,
1111 Tcl_Interp *interp,
1112 int objc,
1113 Tcl_Obj *CONST objv[]
1114){
1115 if( async.writerHaltNow==0 && async.writerHaltWhenIdle==0 ){
1116 Tcl_AppendResult(interp, "would block forever", (char*)0);
1117 return TCL_ERROR;
1118 }
1119 TRACE("WAIT\n",0);
1120 pthread_cond_broadcast(&async.queueSignal);
1121 pthread_mutex_lock(&async.writerMutex);
1122 pthread_mutex_unlock(&async.writerMutex);
1123 return TCL_OK;
1124}
1125
1126
1127#endif /* OS_UNIX and THREADSAFE and defined(SQLITE_ENABLE_REDEF_IO) */
1128
1129/*
1130** This routine registers the custom TCL commands defined in this
1131** module. This should be the only procedure visible from outside
1132** of this module.
1133*/
1134int Sqlitetestasync_Init(Tcl_Interp *interp){
1135#if OS_UNIX && THREADSAFE && defined(SQLITE_ENABLE_REDEF_IO)
1136 Tcl_CreateObjCommand(interp,"sqlite3async_enable",testAsyncEnable,0,0);
1137 Tcl_CreateObjCommand(interp,"sqlite3async_halt",testAsyncHalt,0,0);
1138 Tcl_CreateObjCommand(interp,"sqlite3async_delay",testAsyncDelay,0,0);
1139 Tcl_CreateObjCommand(interp,"sqlite3async_start",testAsyncStart,0,0);
1140 Tcl_CreateObjCommand(interp,"sqlite3async_wait",testAsyncWait,0,0);
1141#endif /* OS_UNIX and THREADSAFE and defined(SQLITE_ENABLE_REDEF_IO) */
1142 return TCL_OK;
1143}