blob: ac8bac1f4358cb646cd84b78d57b95e97b8834c1 [file] [log] [blame]
drh23669402006-01-09 17:29:52 +00001/*
2** 2005 December 14
3**
4** The author disclaims copyright to this source code. In place of
5** a legal notice, here is a blessing:
6**
7** May you do good and not evil.
8** May you find forgiveness for yourself and forgive others.
9** May you share freely, never taking more than you give.
10**
11*************************************************************************
12**
13** This file contains an example implementation of an asynchronous IO
drhfe0f75b2006-01-10 20:01:18 +000014** backend for SQLite.
15**
16** WHAT IS ASYNCHRONOUS I/O?
17**
18** With asynchronous I/O, write requests are handled by a separate thread
19** running in the background. This means that the thread that initiates
20** a database write does not have to wait for (sometimes slow) disk I/O
21** to occur. The write seems to happen very quickly, though in reality
22** it is happening at its usual slow pace in the background.
23**
24** Asynchronous I/O appears to give better responsiveness, but at a price.
25** You lose the Durable property. With the default I/O backend of SQLite,
26** once a write completes, you know that the information you wrote is
27** safely on disk. With the asynchronous I/O, this is no the case. If
28** your program crashes or if you take a power lose after the database
29** write but before the asynchronous write thread has completed, then the
30** database change might never make it to disk and the next user of the
31** database might not see your change.
32**
33** You lose Durability with asynchronous I/O, but you still retain the
34** other parts of ACID: Atomic, Consistent, and Isolated. Many
35** appliations get along fine without the Durablity.
36**
37** HOW IT WORKS
38**
39** Asynchronous I/O works by overloading the OS-layer disk I/O routines
40** with modified versions that store the data to be written in queue of
41** pending write operations. Look at the asyncEnable() subroutine to see
42** how overloading works. Six os-layer routines are overloaded:
43**
44** sqlite3OsOpenReadWrite;
45** sqlite3OsOpenReadOnly;
46** sqlite3OsOpenExclusive;
47** sqlite3OsDelete;
48** sqlite3OsFileExists;
49** sqlite3OsSyncDirectory;
50**
51** The original implementations of these routines are saved and are
52** used by the writer thread to do the real I/O. The substitute
53** implementations typically put the I/O operation on a queue
54** to be handled later by the writer thread, though read operations
55** must be handled right away, obviously.
56**
57** Asynchronous I/O is disabled by setting the os-layer interface routines
58** back to their original values.
59**
60** LIMITATIONS
61**
62** This demonstration code is deliberately kept simple in order to keep
63** the main ideas clear and easy to understand. Real applications that
64** want to do asynchronous I/O might want to add additional capabilities.
65** For example, in this demonstration if writes are happening at a steady
66** stream that exceeds the I/O capability of the background writer thread,
67** the queue of pending write operations will grow without bound until we
68** run out of memory. Users of this technique may want to keep track of
69** the quantity of pending writes and stop accepting new write requests
70** when the buffer gets to be too big.
drh23669402006-01-09 17:29:52 +000071*/
72
73#include "sqliteInt.h"
74#include "os.h"
75#include <tcl.h>
76
drhfe0f75b2006-01-10 20:01:18 +000077/* If the THREADSAFE macro is not set, assume that it is turned off. */
drh23669402006-01-09 17:29:52 +000078#ifndef THREADSAFE
79# define THREADSAFE 0
80#endif
81
82/*
83** This test uses pthreads and hence only works on unix and with
drhfe0f75b2006-01-10 20:01:18 +000084** a threadsafe build of SQLite. It also requires that the redefinable
85** I/O feature of SQLite be turned on. This feature is turned off by
86** default. If a required element is missing, almost all of the code
87** in this file is commented out.
drh23669402006-01-09 17:29:52 +000088*/
89#if OS_UNIX && THREADSAFE && defined(SQLITE_ENABLE_REDEF_IO)
90
drhfe0f75b2006-01-10 20:01:18 +000091/*
92** This demo uses pthreads. If you do not have a pthreads implementation
93** for your operating system, you will need to recode the threading
94** logic.
95*/
drh23669402006-01-09 17:29:52 +000096#include <pthread.h>
97#include <sched.h>
98
drhfe0f75b2006-01-10 20:01:18 +000099/* Useful macros used in several places */
drh23669402006-01-09 17:29:52 +0000100#define MIN(x,y) ((x)<(y)?(x):(y))
101#define MAX(x,y) ((x)>(y)?(x):(y))
102
drhfe0f75b2006-01-10 20:01:18 +0000103/* Forward references */
drh23669402006-01-09 17:29:52 +0000104typedef struct AsyncWrite AsyncWrite;
105typedef struct AsyncFile AsyncFile;
106
drhfe0f75b2006-01-10 20:01:18 +0000107/* Enable for debugging */
drh23669402006-01-09 17:29:52 +0000108#if 0
drhfc8748a2006-02-13 14:49:38 +0000109# define TRACE(X) asyncTrace X
110static void asyncTrace(const char *zFormat, ...){
111 char *z;
112 va_list ap;
113 va_start(ap, zFormat);
114 z = sqlite3_vmprintf(zFormat, ap);
115 va_end(ap);
116 fprintf(stderr, "[%d] %s", (int)pthread_self(), z);
117 free(z);
118}
drh23669402006-01-09 17:29:52 +0000119#else
drhfc8748a2006-02-13 14:49:38 +0000120# define TRACE(X) /* noop */
drh23669402006-01-09 17:29:52 +0000121#endif
122
123/*
drh23669402006-01-09 17:29:52 +0000124** THREAD SAFETY NOTES
125**
126** Basic rules:
127**
128** * Both read and write access to the global write-op queue must be
129** protected by the async.queueMutex.
130**
131** * The file handles from the underlying system are assumed not to
132** be thread safe.
133**
drhfe0f75b2006-01-10 20:01:18 +0000134** * See the last two paragraphs under "The Writer Thread" for
drh23669402006-01-09 17:29:52 +0000135** an assumption to do with file-handle synchronization by the Os.
136**
137** File system operations (invoked by SQLite thread):
138**
139** xOpenXXX (three versions)
140** xDelete
141** xFileExists
142** xSyncDirectory
143**
144** File handle operations (invoked by SQLite thread):
145**
drh23669402006-01-09 17:29:52 +0000146** asyncWrite, asyncClose, asyncTruncate, asyncSync,
147** asyncSetFullSync, asyncOpenDirectory.
148**
drhfe0f75b2006-01-10 20:01:18 +0000149** The operations above add an entry to the global write-op list. They
150** prepare the entry, acquire the async.queueMutex momentarily while
151** list pointers are manipulated to insert the new entry, then release
152** the mutex and signal the writer thread to wake up in case it happens
153** to be asleep.
154**
drh23669402006-01-09 17:29:52 +0000155**
156** asyncRead, asyncFileSize.
drhfe0f75b2006-01-10 20:01:18 +0000157**
158** Read operations. Both of these read from both the underlying file
159** first then adjust their result based on pending writes in the
160** write-op queue. So async.queueMutex is held for the duration
161** of these operations to prevent other threads from changing the
162** queue in mid operation.
163**
164**
165** asyncLock, asyncUnlock, asyncLockState, asyncCheckReservedLock
drh23669402006-01-09 17:29:52 +0000166**
167** These locking primitives become no-ops. Files are always opened for
drhfe0f75b2006-01-10 20:01:18 +0000168** exclusive access when using this IO backend.
169**
170**
171** asyncFileHandle.
drh23669402006-01-09 17:29:52 +0000172**
173** The sqlite3OsFileHandle() function is currently only used when
174** debugging the pager module. Unless sqlite3OsClose() is called on the
175** file (shouldn't be possible for other reasons), the underlying
176** implementations are safe to call without grabbing any mutex. So we just
drhfe0f75b2006-01-10 20:01:18 +0000177** go ahead and call it no matter what any other threads are doing.
drh23669402006-01-09 17:29:52 +0000178**
drhfe0f75b2006-01-10 20:01:18 +0000179**
180** asyncSeek.
drh23669402006-01-09 17:29:52 +0000181**
182** Calling this method just manipulates the AsyncFile.iOffset variable.
183** Since this variable is never accessed by writer thread, this
184** function does not require the mutex. Actual calls to OsSeek() take
185** place just before OsWrite() or OsRead(), which are always protected by
186** the mutex.
drh23669402006-01-09 17:29:52 +0000187**
188** The writer thread:
189**
190** The async.writerMutex is used to make sure only there is only
191** a single writer thread running at a time.
192**
193** Inside the writer thread is a loop that works like this:
194**
195** WHILE (write-op list is not empty)
196** Do IO operation at head of write-op list
197** Remove entry from head of write-op list
198** END WHILE
199**
200** The async.queueMutex is always held during the <write-op list is
201** not empty> test, and when the entry is removed from the head
202** of the write-op list. Sometimes it is held for the interim
drhfe0f75b2006-01-10 20:01:18 +0000203** period (while the IO is performed), and sometimes it is
drh23669402006-01-09 17:29:52 +0000204** relinquished. It is relinquished if (a) the IO op is an
205** ASYNC_CLOSE or (b) when the file handle was opened, two of
206** the underlying systems handles were opened on the same
207** file-system entry.
208**
209** If condition (b) above is true, then one file-handle
210** (AsyncFile.pBaseRead) is used exclusively by sqlite threads to read the
211** file, the other (AsyncFile.pBaseWrite) by sqlite3_async_flush()
212** threads to perform write() operations. This means that read
213** operations are not blocked by asynchronous writes (although
214** asynchronous writes may still be blocked by reads).
215**
216** This assumes that the OS keeps two handles open on the same file
217** properly in sync. That is, any read operation that starts after a
218** write operation on the same file system entry has completed returns
219** data consistent with the write. We also assume that if one thread
220** reads a file while another is writing it all bytes other than the
221** ones actually being written contain valid data.
222**
223** If the above assumptions are not true, set the preprocessor symbol
224** SQLITE_ASYNC_TWO_FILEHANDLES to 0.
225*/
226
227#ifndef SQLITE_ASYNC_TWO_FILEHANDLES
228/* #define SQLITE_ASYNC_TWO_FILEHANDLES 0 */
229#define SQLITE_ASYNC_TWO_FILEHANDLES 1
230#endif
231
232/*
233** State information is held in the static variable "async" defined
234** as follows:
235*/
236static struct TestAsyncStaticData {
237 pthread_mutex_t queueMutex; /* Mutex for access to write operation queue */
238 pthread_mutex_t writerMutex; /* Prevents multiple writer threads */
239 pthread_cond_t queueSignal; /* For waking up sleeping writer thread */
240 pthread_cond_t emptySignal; /* Notify when the write queue is empty */
241 AsyncWrite *pQueueFirst; /* Next write operation to be processed */
242 AsyncWrite *pQueueLast; /* Last write operation on the list */
243 volatile int ioDelay; /* Extra delay between write operations */
244 volatile int writerHaltWhenIdle; /* Writer thread halts when queue empty */
245 volatile int writerHaltNow; /* Writer thread halts after next op */
246} async = {
247 PTHREAD_MUTEX_INITIALIZER,
248 PTHREAD_MUTEX_INITIALIZER,
249 PTHREAD_COND_INITIALIZER,
250 PTHREAD_COND_INITIALIZER,
251};
252
253/* Possible values of AsyncWrite.op */
drh4b74b262006-02-13 13:50:55 +0000254#define ASYNC_NOOP 0
drh23669402006-01-09 17:29:52 +0000255#define ASYNC_WRITE 1
256#define ASYNC_SYNC 2
257#define ASYNC_TRUNCATE 3
258#define ASYNC_CLOSE 4
259#define ASYNC_OPENDIRECTORY 5
260#define ASYNC_SETFULLSYNC 6
261
262#define ASYNC_DELETE 7
263#define ASYNC_OPENEXCLUSIVE 8
264#define ASYNC_SYNCDIRECTORY 9
265
266/*
drhfe0f75b2006-01-10 20:01:18 +0000267** Entries on the write-op queue are instances of the AsyncWrite
268** structure, defined here.
269**
drh23669402006-01-09 17:29:52 +0000270** The interpretation of the iOffset and nByte variables varies depending
271** on the value of AsyncWrite.op:
272**
273** ASYNC_WRITE:
274** iOffset -> Offset in file to write to.
275** nByte -> Number of bytes of data to write (pointed to by zBuf).
276**
277** ASYNC_SYNC:
278** iOffset -> Unused.
279** nByte -> Value of "fullsync" flag to pass to sqlite3OsSync().
280**
281** ASYNC_TRUNCATE:
282** iOffset -> Size to truncate file to.
283** nByte -> Unused.
284**
285** ASYNC_CLOSE:
286** iOffset -> Unused.
287** nByte -> Unused.
288**
289** ASYNC_OPENDIRECTORY:
290** iOffset -> Unused.
291** nByte -> Number of bytes of zBuf points to (directory name).
292**
293** ASYNC_SETFULLSYNC:
294** iOffset -> Unused.
295** nByte -> New value for the full-sync flag.
296**
297**
298** ASYNC_DELETE:
299** iOffset -> Unused.
300** nByte -> Number of bytes of zBuf points to (file name).
301**
302** ASYNC_OPENEXCLUSIVE:
303** iOffset -> Value of "delflag".
304** nByte -> Number of bytes of zBuf points to (file name).
305**
306** For an ASYNC_WRITE operation, zBuf points to the data to write to the file.
307** This space is sqliteMalloc()d along with the AsyncWrite structure in a
308** single blob, so is deleted when sqliteFree() is called on the parent
309** structure.
310*/
311struct AsyncWrite {
312 AsyncFile *pFile; /* File to write data to or sync */
313 int op; /* One of ASYNC_xxx etc. */
314 i64 iOffset; /* See above */
315 int nByte; /* See above */
316 char *zBuf; /* Data to write to file (or NULL if op!=ASYNC_WRITE) */
317 AsyncWrite *pNext; /* Next write operation (to any file) */
318};
319
320/*
321** The AsyncFile structure is a subclass of OsFile used for asynchronous IO.
322*/
323struct AsyncFile {
324 IoMethod *pMethod; /* Must be first */
drhfc8748a2006-02-13 14:49:38 +0000325 int ioError; /* Value of any asychronous error we have seen */
326 i64 iOffset; /* Current seek() offset in file */
drh23669402006-01-09 17:29:52 +0000327 OsFile *pBaseRead; /* Read handle to the underlying Os file */
328 OsFile *pBaseWrite; /* Write handle to the underlying Os file */
329};
330
331/*
332** Add an entry to the end of the global write-op list. pWrite should point
drhfe0f75b2006-01-10 20:01:18 +0000333** to an AsyncWrite structure allocated using sqlite3OsMalloc(). The writer
334** thread will call sqlite3OsFree() to free the structure after the specified
335** operation has been completed.
drh23669402006-01-09 17:29:52 +0000336**
drhfe0f75b2006-01-10 20:01:18 +0000337** Once an AsyncWrite structure has been added to the list, it becomes the
338** property of the writer thread and must not be read or modified by the
339** caller.
drh23669402006-01-09 17:29:52 +0000340*/
341static void addAsyncWrite(AsyncWrite *pWrite){
drhfe0f75b2006-01-10 20:01:18 +0000342 /* We must hold the queue mutex in order to modify the queue pointers */
drh23669402006-01-09 17:29:52 +0000343 pthread_mutex_lock(&async.queueMutex);
drhfe0f75b2006-01-10 20:01:18 +0000344
345 /* Add the record to the end of the write-op queue */
drh23669402006-01-09 17:29:52 +0000346 assert( !pWrite->pNext );
347 if( async.pQueueLast ){
348 assert( async.pQueueFirst );
349 async.pQueueLast->pNext = pWrite;
350 }else{
351 async.pQueueFirst = pWrite;
352 }
353 async.pQueueLast = pWrite;
drhfc8748a2006-02-13 14:49:38 +0000354 TRACE(("PUSH %p\n", pWrite));
drhfe0f75b2006-01-10 20:01:18 +0000355
356 /* Drop the queue mutex */
drh23669402006-01-09 17:29:52 +0000357 pthread_mutex_unlock(&async.queueMutex);
drhfe0f75b2006-01-10 20:01:18 +0000358
359 /* The writer thread might have been idle because there was nothing
360 ** on the write-op queue for it to do. So wake it up. */
drh23669402006-01-09 17:29:52 +0000361 pthread_cond_signal(&async.queueSignal);
362}
363
364/*
365** This is a utility function to allocate and populate a new AsyncWrite
366** structure and insert it (via addAsyncWrite() ) into the global list.
367*/
368static int addNewAsyncWrite(
369 AsyncFile *pFile,
370 int op,
371 i64 iOffset,
372 int nByte,
373 const char *zByte
374){
drh4b74b262006-02-13 13:50:55 +0000375 AsyncWrite *p;
376 if( pFile && pFile->ioError!=SQLITE_OK ){
377 return pFile->ioError;
378 }
379 p = sqlite3OsMalloc(sizeof(AsyncWrite) + (zByte?nByte:0));
drh23669402006-01-09 17:29:52 +0000380 if( !p ){
381 return SQLITE_NOMEM;
382 }
383 p->op = op;
384 p->iOffset = iOffset;
385 p->nByte = nByte;
386 p->pFile = pFile;
387 p->pNext = 0;
388 if( zByte ){
389 p->zBuf = (char *)&p[1];
390 memcpy(p->zBuf, zByte, nByte);
391 }else{
392 p->zBuf = 0;
393 }
394 addAsyncWrite(p);
395 return SQLITE_OK;
396}
397
398/*
399** Close the file. This just adds an entry to the write-op list, the file is
400** not actually closed.
401*/
402static int asyncClose(OsFile **pId){
403 return addNewAsyncWrite((AsyncFile *)*pId, ASYNC_CLOSE, 0, 0, 0);
404}
405
406/*
407** Implementation of sqlite3OsWrite() for asynchronous files. Instead of
408** writing to the underlying file, this function adds an entry to the end of
409** the global AsyncWrite list. Either SQLITE_OK or SQLITE_NOMEM may be
410** returned.
411*/
412static int asyncWrite(OsFile *id, const void *pBuf, int amt){
413 AsyncFile *pFile = (AsyncFile *)id;
414 int rc = addNewAsyncWrite(pFile, ASYNC_WRITE, pFile->iOffset, amt, pBuf);
415 pFile->iOffset += (i64)amt;
416 return rc;
417}
418
419/*
420** Truncate the file to nByte bytes in length. This just adds an entry to
421** the write-op list, no IO actually takes place.
422*/
423static int asyncTruncate(OsFile *id, i64 nByte){
424 return addNewAsyncWrite((AsyncFile *)id, ASYNC_TRUNCATE, nByte, 0, 0);
425}
426
427/*
428** Open the directory identified by zName and associate it with the
429** specified file. This just adds an entry to the write-op list, the
430** directory is opened later by sqlite3_async_flush().
431*/
432static int asyncOpenDirectory(OsFile *id, const char *zName){
433 AsyncFile *pFile = (AsyncFile *)id;
434 return addNewAsyncWrite(pFile, ASYNC_OPENDIRECTORY, 0, strlen(zName)+1,zName);
435}
436
437/*
438** Sync the file. This just adds an entry to the write-op list, the
439** sync() is done later by sqlite3_async_flush().
440*/
441static int asyncSync(OsFile *id, int fullsync){
442 return addNewAsyncWrite((AsyncFile *)id, ASYNC_SYNC, 0, fullsync, 0);
443}
444
445/*
446** Set (or clear) the full-sync flag on the underlying file. This operation
447** is queued and performed later by sqlite3_async_flush().
448*/
449static void asyncSetFullSync(OsFile *id, int value){
450 addNewAsyncWrite((AsyncFile *)id, ASYNC_SETFULLSYNC, 0, value, 0);
451}
452
453/*
454** Read data from the file. First we read from the filesystem, then adjust
455** the contents of the buffer based on ASYNC_WRITE operations in the
drhfe0f75b2006-01-10 20:01:18 +0000456** write-op queue.
drh23669402006-01-09 17:29:52 +0000457**
458** This method holds the mutex from start to finish.
459*/
460static int asyncRead(OsFile *id, void *obuf, int amt){
461 int rc = SQLITE_OK;
462 i64 filesize;
463 int nRead;
464 AsyncFile *pFile = (AsyncFile *)id;
465
drh4b74b262006-02-13 13:50:55 +0000466 /* If an I/O error has previously occurred on this file, then all
467 ** subsequent operations fail.
468 */
469 if( pFile->ioError!=SQLITE_OK ){
470 return pFile->ioError;
471 }
472
drh23669402006-01-09 17:29:52 +0000473 /* Grab the write queue mutex for the duration of the call */
474 pthread_mutex_lock(&async.queueMutex);
475
476 if( pFile->pBaseRead ){
477 rc = sqlite3OsFileSize(pFile->pBaseRead, &filesize);
478 if( rc!=SQLITE_OK ){
479 goto asyncread_out;
480 }
481 rc = sqlite3OsSeek(pFile->pBaseRead, pFile->iOffset);
482 if( rc!=SQLITE_OK ){
483 goto asyncread_out;
484 }
485 nRead = MIN(filesize - pFile->iOffset, amt);
486 if( nRead>0 ){
487 rc = sqlite3OsRead(((AsyncFile *)id)->pBaseRead, obuf, nRead);
488 }
489 }
490
491 if( rc==SQLITE_OK ){
492 AsyncWrite *p;
493 i64 iOffset = pFile->iOffset; /* Current seek offset */
494
495 for(p=async.pQueueFirst; p; p = p->pNext){
496 if( p->pFile==pFile && p->op==ASYNC_WRITE ){
drh44528382006-02-13 13:30:19 +0000497 int iBeginOut = (p->iOffset - iOffset);
498 int iBeginIn = -iBeginOut;
drh23669402006-01-09 17:29:52 +0000499 int nCopy;
500
501 if( iBeginIn<0 ) iBeginIn = 0;
502 if( iBeginOut<0 ) iBeginOut = 0;
503 nCopy = MIN(p->nByte-iBeginIn, amt-iBeginOut);
504
505 if( nCopy>0 ){
506 memcpy(&((char *)obuf)[iBeginOut], &p->zBuf[iBeginIn], nCopy);
507 }
508 }
509 }
510
511 pFile->iOffset += (i64)amt;
512 }
513
514asyncread_out:
515 pthread_mutex_unlock(&async.queueMutex);
516 return rc;
517}
518
519/*
520** Seek to the specified offset. This just adjusts the AsyncFile.iOffset
521** variable - calling seek() on the underlying file is defered until the
522** next read() or write() operation.
523*/
524static int asyncSeek(OsFile *id, i64 offset){
525 AsyncFile *pFile = (AsyncFile *)id;
526 pFile->iOffset = offset;
527 return SQLITE_OK;
528}
529
530/*
531** Read the size of the file. First we read the size of the file system
532** entry, then adjust for any ASYNC_WRITE or ASYNC_TRUNCATE operations
533** currently in the write-op list.
534**
535** This method holds the mutex from start to finish.
536*/
537int asyncFileSize(OsFile *id, i64 *pSize){
538 int rc = SQLITE_OK;
539 i64 s = 0;
540 OsFile *pBase;
541
542 pthread_mutex_lock(&async.queueMutex);
543
544 /* Read the filesystem size from the base file. If pBaseRead is NULL, this
545 ** means the file hasn't been opened yet. In this case all relevant data
546 ** must be in the write-op queue anyway, so we can omit reading from the
547 ** file-system.
548 */
549 pBase = ((AsyncFile *)id)->pBaseRead;
550 if( pBase ){
551 rc = sqlite3OsFileSize(pBase, &s);
552 }
553
554 if( rc==SQLITE_OK ){
555 AsyncWrite *p;
556 for(p=async.pQueueFirst; p; p = p->pNext){
557 if( p->pFile==(AsyncFile *)id ){
558 switch( p->op ){
559 case ASYNC_WRITE:
560 s = MAX(p->iOffset + (i64)(p->nByte), s);
561 break;
562 case ASYNC_TRUNCATE:
563 s = MIN(s, p->nByte);
564 break;
565 }
566 }
567 }
568 *pSize = s;
569 }
570 pthread_mutex_unlock(&async.queueMutex);
571 return rc;
572}
573
574/*
575** Return the operating system file handle. This is only used for debugging
576** at the moment anyway.
577*/
578static int asyncFileHandle(OsFile *id){
579 return sqlite3OsFileHandle(((AsyncFile *)id)->pBaseRead);
580}
581
drhfe0f75b2006-01-10 20:01:18 +0000582/*
583** No file locking occurs with this version of the asynchronous backend.
584** So the locking routines are no-ops.
585*/
drh23669402006-01-09 17:29:52 +0000586static int asyncLock(OsFile *id, int lockType){
587 return SQLITE_OK;
588}
589static int asyncUnlock(OsFile *id, int lockType){
590 return SQLITE_OK;
591}
592
593/*
594** This function is called when the pager layer first opens a database file
595** and is checking for a hot-journal.
596*/
597static int asyncCheckReservedLock(OsFile *id){
598 return SQLITE_OK;
599}
600
601/*
602** This is broken. But sqlite3OsLockState() is only used for testing anyway.
603*/
604static int asyncLockState(OsFile *id){
605 return SQLITE_OK;
606}
607
608/*
609** The following variables hold pointers to the original versions of
drhfe0f75b2006-01-10 20:01:18 +0000610** OS-layer interface routines that are overloaded in order to create
611** the asynchronous I/O backend.
drh23669402006-01-09 17:29:52 +0000612*/
613static int (*xOrigOpenReadWrite)(const char*, OsFile**, int*) = 0;
614static int (*xOrigOpenExclusive)(const char*, OsFile**, int) = 0;
615static int (*xOrigOpenReadOnly)(const char*, OsFile**) = 0;
616static int (*xOrigDelete)(const char*) = 0;
617static int (*xOrigFileExists)(const char*) = 0;
618static int (*xOrigSyncDirectory)(const char*) = 0;
619
drhfe0f75b2006-01-10 20:01:18 +0000620/*
621** This routine does most of the work of opening a file and building
622** the OsFile structure.
623*/
drh23669402006-01-09 17:29:52 +0000624static int asyncOpenFile(
drhfe0f75b2006-01-10 20:01:18 +0000625 const char *zName, /* The name of the file to be opened */
626 OsFile **pFile, /* Put the OsFile structure here */
627 OsFile *pBaseRead, /* The real OsFile from the real I/O routine */
628 int openForWriting /* Open a second file handle for writing if true */
drh23669402006-01-09 17:29:52 +0000629){
630 int rc;
631 AsyncFile *p;
632 OsFile *pBaseWrite = 0;
633
634 static IoMethod iomethod = {
635 asyncClose,
636 asyncOpenDirectory,
637 asyncRead,
638 asyncWrite,
639 asyncSeek,
640 asyncTruncate,
641 asyncSync,
642 asyncSetFullSync,
643 asyncFileHandle,
644 asyncFileSize,
645 asyncLock,
646 asyncUnlock,
647 asyncLockState,
648 asyncCheckReservedLock
649 };
650
drhfe0f75b2006-01-10 20:01:18 +0000651 if( openForWriting && SQLITE_ASYNC_TWO_FILEHANDLES ){
drh23669402006-01-09 17:29:52 +0000652 int dummy;
653 rc = xOrigOpenReadWrite(zName, &pBaseWrite, &dummy);
654 if( rc!=SQLITE_OK ){
655 goto error_out;
656 }
657 }
658
659 p = (AsyncFile *)sqlite3OsMalloc(sizeof(AsyncFile));
660 if( !p ){
661 rc = SQLITE_NOMEM;
662 goto error_out;
663 }
664 memset(p, 0, sizeof(AsyncFile));
665
666 p->pMethod = &iomethod;
667 p->pBaseRead = pBaseRead;
668 p->pBaseWrite = pBaseWrite;
drh4b74b262006-02-13 13:50:55 +0000669 p->ioError = SQLITE_OK;
drh23669402006-01-09 17:29:52 +0000670
671 *pFile = (OsFile *)p;
672 return SQLITE_OK;
673
674error_out:
675 assert(!p);
676 sqlite3OsClose(&pBaseRead);
677 sqlite3OsClose(&pBaseWrite);
678 *pFile = 0;
679 return rc;
680}
681
682/*
683** The async-IO backends implementation of the three functions used to open
684** a file (xOpenExclusive, xOpenReadWrite and xOpenReadOnly). Most of the
685** work is done in function asyncOpenFile() - see above.
686*/
687static int asyncOpenExclusive(const char *z, OsFile **ppFile, int delFlag){
688 int rc = asyncOpenFile(z, ppFile, 0, 0);
689 if( rc==SQLITE_OK ){
690 AsyncFile *pFile = (AsyncFile *)(*ppFile);
691 int nByte = strlen(z)+1;
692 i64 i = (i64)(delFlag);
693 rc = addNewAsyncWrite(pFile, ASYNC_OPENEXCLUSIVE, i, nByte, z);
694 if( rc!=SQLITE_OK ){
695 sqlite3OsFree(pFile);
696 *ppFile = 0;
697 }
698 }
699 return rc;
700}
701static int asyncOpenReadOnly(const char *z, OsFile **ppFile){
702 OsFile *pBase = 0;
703 int rc = xOrigOpenReadOnly(z, &pBase);
704 if( rc==SQLITE_OK ){
705 rc = asyncOpenFile(z, ppFile, pBase, 0);
706 }
707 return rc;
708}
709static int asyncOpenReadWrite(const char *z, OsFile **ppFile, int *pReadOnly){
710 OsFile *pBase = 0;
711 int rc = xOrigOpenReadWrite(z, &pBase, pReadOnly);
712 if( rc==SQLITE_OK ){
713 rc = asyncOpenFile(z, ppFile, pBase, (*pReadOnly ? 0 : 1));
714 }
715 return rc;
716}
717
718/*
719** Implementation of sqlite3OsDelete. Add an entry to the end of the
720** write-op queue to perform the delete.
721*/
722static int asyncDelete(const char *z){
723 return addNewAsyncWrite(0, ASYNC_DELETE, 0, strlen(z)+1, z);
724}
725
726/*
727** Implementation of sqlite3OsSyncDirectory. Add an entry to the end of the
728** write-op queue to perform the directory sync.
729*/
730static int asyncSyncDirectory(const char *z){
731 return addNewAsyncWrite(0, ASYNC_SYNCDIRECTORY, 0, strlen(z)+1, z);
732}
733
734/*
735** Implementation of sqlite3OsFileExists. Return true if file 'z' exists
736** in the file system.
737**
738** This method holds the mutex from start to finish.
739*/
740static int asyncFileExists(const char *z){
741 int ret;
742 AsyncWrite *p;
743
744 pthread_mutex_lock(&async.queueMutex);
745
746 /* See if the real file system contains the specified file. */
747 ret = xOrigFileExists(z);
748
749 for(p=async.pQueueFirst; p; p = p->pNext){
750 if( p->op==ASYNC_DELETE && 0==strcmp(p->zBuf, z) ){
751 ret = 0;
752 }else if( p->op==ASYNC_OPENEXCLUSIVE && 0==strcmp(p->zBuf, z) ){
753 ret = 1;
754 }
755 }
756
757 pthread_mutex_unlock(&async.queueMutex);
758 return ret;
759}
760
761/*
762** Call this routine to enable or disable the
763** asynchronous IO features implemented in this file.
764**
765** This routine is not even remotely threadsafe. Do not call
766** this routine while any SQLite database connections are open.
767*/
768static void asyncEnable(int enable){
769 if( enable && xOrigOpenReadWrite==0 ){
770 xOrigOpenReadWrite = sqlite3Os.xOpenReadWrite;
771 xOrigOpenReadOnly = sqlite3Os.xOpenReadOnly;
772 xOrigOpenExclusive = sqlite3Os.xOpenExclusive;
773 xOrigDelete = sqlite3Os.xDelete;
774 xOrigFileExists = sqlite3Os.xFileExists;
775 xOrigSyncDirectory = sqlite3Os.xSyncDirectory;
776
777 sqlite3Os.xOpenReadWrite = asyncOpenReadWrite;
778 sqlite3Os.xOpenReadOnly = asyncOpenReadOnly;
779 sqlite3Os.xOpenExclusive = asyncOpenExclusive;
780 sqlite3Os.xDelete = asyncDelete;
781 sqlite3Os.xFileExists = asyncFileExists;
782 sqlite3Os.xSyncDirectory = asyncSyncDirectory;
783 }
784 if( !enable && xOrigOpenReadWrite!=0 ){
785 sqlite3Os.xOpenReadWrite = xOrigOpenReadWrite;
786 sqlite3Os.xOpenReadOnly = xOrigOpenReadOnly;
787 sqlite3Os.xOpenExclusive = xOrigOpenExclusive;
788 sqlite3Os.xDelete = xOrigDelete;
789 sqlite3Os.xFileExists = xOrigFileExists;
790 sqlite3Os.xSyncDirectory = xOrigSyncDirectory;
791
792 xOrigOpenReadWrite = 0;
793 xOrigOpenReadOnly = 0;
794 xOrigOpenExclusive = 0;
795 xOrigDelete = 0;
796 xOrigFileExists = 0;
797 xOrigSyncDirectory = 0;
798 }
799}
800
801/*
802** This procedure runs in a separate thread, reading messages off of the
803** write queue and processing them one by one.
804**
805** If async.writerHaltNow is true, then this procedure exits
806** after processing a single message.
807**
808** If async.writerHaltWhenIdle is true, then this procedure exits when
809** the write queue is empty.
810**
811** If both of the above variables are false, this procedure runs
812** indefinately, waiting for operations to be added to the write queue
813** and processing them in the order in which they arrive.
814**
815** An artifical delay of async.ioDelay milliseconds is inserted before
816** each write operation in order to simulate the effect of a slow disk.
817**
818** Only one instance of this procedure may be running at a time.
819*/
820static void *asyncWriterThread(void *NotUsed){
821 AsyncWrite *p = 0;
822 int rc = SQLITE_OK;
823
824 if( pthread_mutex_trylock(&async.writerMutex) ){
825 return 0;
826 }
827 while( async.writerHaltNow==0 ){
828 int holdingMutex;
829 OsFile *pBase = 0;
830
831 pthread_mutex_lock(&async.queueMutex);
832 holdingMutex = 1;
833 while( (p = async.pQueueFirst)==0 ){
834 pthread_cond_broadcast(&async.emptySignal);
835 if( async.writerHaltWhenIdle ){
836 pthread_mutex_unlock(&async.queueMutex);
837 break;
838 }else{
drhfc8748a2006-02-13 14:49:38 +0000839 TRACE(("IDLE\n"));
drh23669402006-01-09 17:29:52 +0000840 pthread_cond_wait(&async.queueSignal, &async.queueMutex);
drhfc8748a2006-02-13 14:49:38 +0000841 TRACE(("WAKEUP\n"));
drh23669402006-01-09 17:29:52 +0000842 }
843 }
844 if( p==0 ) break;
drhfc8748a2006-02-13 14:49:38 +0000845 TRACE(("PROCESSING %p\n", p));
drh23669402006-01-09 17:29:52 +0000846
847 /* Right now this thread is holding the mutex on the write-op queue.
848 ** Variable 'p' points to the first entry in the write-op queue. In
849 ** the general case, we hold on to the mutex for the entire body of
850 ** the loop.
851 **
852 ** However in the cases enumerated below, we relinquish the mutex,
853 ** perform the IO, and then re-request the mutex before removing 'p' from
854 ** the head of the write-op queue. The idea is to increase concurrency with
855 ** sqlite threads.
856 **
857 ** * An ASYNC_CLOSE operation.
858 ** * An ASYNC_OPENEXCLUSIVE operation. For this one, we relinquish
859 ** the mutex, call the underlying xOpenExclusive() function, then
860 ** re-aquire the mutex before seting the AsyncFile.pBaseRead
861 ** variable.
862 ** * ASYNC_SYNC and ASYNC_WRITE operations, if
863 ** SQLITE_ASYNC_TWO_FILEHANDLES was set at compile time and two
864 ** file-handles are open for the particular file being "synced".
865 */
866 if( p->pFile ){
867 pBase = p->pFile->pBaseWrite;
drh4b74b262006-02-13 13:50:55 +0000868 if( p->pFile->ioError!=SQLITE_OK && p->op!=ASYNC_CLOSE ){
869 p->op = ASYNC_NOOP;
870 }
drh23669402006-01-09 17:29:52 +0000871 if(
872 p->op==ASYNC_CLOSE ||
873 p->op==ASYNC_OPENEXCLUSIVE ||
874 (pBase && (p->op==ASYNC_SYNC || p->op==ASYNC_WRITE) )
875 ){
876 pthread_mutex_unlock(&async.queueMutex);
877 holdingMutex = 0;
878 }
879 if( !pBase ){
880 pBase = p->pFile->pBaseRead;
881 }
882 }
883
884 switch( p->op ){
drh4b74b262006-02-13 13:50:55 +0000885 case ASYNC_NOOP:
886 break;
887
drh23669402006-01-09 17:29:52 +0000888 case ASYNC_WRITE:
889 assert( pBase );
890 rc = sqlite3OsSeek(pBase, p->iOffset);
891 if( rc==SQLITE_OK ){
892 rc = sqlite3OsWrite(pBase, (const void *)(p->zBuf), p->nByte);
893 }
894 break;
895
896 case ASYNC_SYNC:
897 assert( pBase );
898 rc = sqlite3OsSync(pBase, p->nByte);
899 break;
900
901 case ASYNC_TRUNCATE:
902 assert( pBase );
903 rc = sqlite3OsTruncate(pBase, p->nByte);
904 break;
905
906 case ASYNC_CLOSE:
907 sqlite3OsClose(&p->pFile->pBaseRead);
908 sqlite3OsClose(&p->pFile->pBaseWrite);
909 sqlite3OsFree(p->pFile);
910 break;
911
912 case ASYNC_OPENDIRECTORY:
913 assert( pBase );
914 sqlite3OsOpenDirectory(pBase, p->zBuf);
915 break;
916
917 case ASYNC_SETFULLSYNC:
918 assert( pBase );
919 sqlite3OsSetFullSync(pBase, p->nByte);
920 break;
921
922 case ASYNC_DELETE:
923 rc = xOrigDelete(p->zBuf);
924 break;
925
926 case ASYNC_SYNCDIRECTORY:
927 rc = xOrigSyncDirectory(p->zBuf);
928 break;
929
930 case ASYNC_OPENEXCLUSIVE: {
931 AsyncFile *pFile = p->pFile;
932 int delFlag = ((p->iOffset)?1:0);
933 OsFile *pBase = 0;
934 rc = xOrigOpenExclusive(p->zBuf, &pBase, delFlag);
935 assert( holdingMutex==0 );
936 pthread_mutex_lock(&async.queueMutex);
937 holdingMutex = 1;
938 if( rc==SQLITE_OK ){
939 pFile->pBaseRead = pBase;
940 }
941 break;
942 }
943
944 default: assert(!"Illegal value for AsyncWrite.op");
945 }
946
drh4b74b262006-02-13 13:50:55 +0000947 /* If an error happens, store the error code in the pFile.ioError
948 ** field. This will prevent any future operations on that file,
949 ** other than closing it.
950 **
951 ** We cannot report the error back to the connection that requested
952 ** the I/O since the error happened asynchronously. The connection has
953 ** already moved on. There really is nobody to report the error to.
954 */
955 if( rc!=SQLITE_OK ){
956 p->pFile->ioError = rc;
957 rc = SQLITE_OK;
958 }
959
drh23669402006-01-09 17:29:52 +0000960 /* If we didn't hang on to the mutex during the IO op, obtain it now
961 ** so that the AsyncWrite structure can be safely removed from the
962 ** global write-op queue.
963 */
964 if( !holdingMutex ){
965 pthread_mutex_lock(&async.queueMutex);
966 holdingMutex = 1;
967 }
drhfc8748a2006-02-13 14:49:38 +0000968 TRACE(("UNLINK %p\n", p));
drh4b74b262006-02-13 13:50:55 +0000969 if( p==async.pQueueLast ){
970 async.pQueueLast = 0;
drh23669402006-01-09 17:29:52 +0000971 }
drh4b74b262006-02-13 13:50:55 +0000972 async.pQueueFirst = p->pNext;
drh5c323542006-02-13 13:23:57 +0000973 sqlite3OsFree(p);
drh23669402006-01-09 17:29:52 +0000974 assert( holdingMutex );
975
976 /* Drop the queue mutex before continuing to the next write operation
977 ** in order to give other threads a chance to work with the write queue.
978 */
979 pthread_mutex_unlock(&async.queueMutex);
980 if( async.ioDelay>0 ){
981 sqlite3OsSleep(async.ioDelay);
982 }else{
983 sched_yield();
984 }
985 }
986 pthread_mutex_unlock(&async.writerMutex);
987 return 0;
988}
989
990/**************************************************************************
991** The remaining code defines a Tcl interface for testing the asynchronous
992** IO implementation in this file.
993**
994** To adapt the code to a non-TCL environment, delete or comment out
995** the code that follows.
996*/
997
998/*
999** sqlite3async_enable ?YES/NO?
1000**
1001** Enable or disable the asynchronous I/O backend. This command is
1002** not thread-safe. Do not call it while any database connections
1003** are open.
1004*/
1005static int testAsyncEnable(
1006 void * clientData,
1007 Tcl_Interp *interp,
1008 int objc,
1009 Tcl_Obj *CONST objv[]
1010){
1011 if( objc!=1 && objc!=2 ){
1012 Tcl_WrongNumArgs(interp, 1, objv, "?YES/NO?");
1013 return TCL_ERROR;
1014 }
1015 if( objc==1 ){
1016 Tcl_SetObjResult(interp, Tcl_NewBooleanObj(xOrigOpenReadWrite!=0));
1017 }else{
1018 int en;
1019 if( Tcl_GetBooleanFromObj(interp, objv[1], &en) ) return TCL_ERROR;
1020 asyncEnable(en);
1021 }
1022 return TCL_OK;
1023}
1024
1025/*
1026** sqlite3async_halt "now"|"idle"|"never"
1027**
1028** Set the conditions at which the writer thread will halt.
1029*/
1030static int testAsyncHalt(
1031 void * clientData,
1032 Tcl_Interp *interp,
1033 int objc,
1034 Tcl_Obj *CONST objv[]
1035){
1036 const char *zCond;
1037 if( objc!=2 ){
1038 Tcl_WrongNumArgs(interp, 1, objv, "\"now\"|\"idle\"|\"never\"");
1039 return TCL_ERROR;
1040 }
1041 zCond = Tcl_GetString(objv[1]);
1042 if( strcmp(zCond, "now")==0 ){
1043 async.writerHaltNow = 1;
1044 pthread_cond_broadcast(&async.queueSignal);
1045 }else if( strcmp(zCond, "idle")==0 ){
1046 async.writerHaltWhenIdle = 1;
1047 async.writerHaltNow = 0;
1048 pthread_cond_broadcast(&async.queueSignal);
1049 }else if( strcmp(zCond, "never")==0 ){
1050 async.writerHaltWhenIdle = 0;
1051 async.writerHaltNow = 0;
1052 }else{
1053 Tcl_AppendResult(interp,
1054 "should be one of: \"now\", \"idle\", or \"never\"", (char*)0);
1055 return TCL_ERROR;
1056 }
1057 return TCL_OK;
1058}
1059
1060/*
1061** sqlite3async_delay ?MS?
1062**
1063** Query or set the number of milliseconds of delay in the writer
1064** thread after each write operation. The default is 0. By increasing
1065** the memory delay we can simulate the effect of slow disk I/O.
1066*/
1067static int testAsyncDelay(
1068 void * clientData,
1069 Tcl_Interp *interp,
1070 int objc,
1071 Tcl_Obj *CONST objv[]
1072){
1073 if( objc!=1 && objc!=2 ){
1074 Tcl_WrongNumArgs(interp, 1, objv, "?MS?");
1075 return TCL_ERROR;
1076 }
1077 if( objc==1 ){
1078 Tcl_SetObjResult(interp, Tcl_NewIntObj(async.ioDelay));
1079 }else{
1080 int ioDelay;
1081 if( Tcl_GetIntFromObj(interp, objv[1], &ioDelay) ) return TCL_ERROR;
1082 async.ioDelay = ioDelay;
1083 }
1084 return TCL_OK;
1085}
1086
1087/*
1088** sqlite3async_start
1089**
1090** Start a new writer thread.
1091*/
1092static int testAsyncStart(
1093 void * clientData,
1094 Tcl_Interp *interp,
1095 int objc,
1096 Tcl_Obj *CONST objv[]
1097){
1098 pthread_t x;
1099 int rc;
1100 rc = pthread_create(&x, 0, asyncWriterThread, 0);
1101 if( rc ){
1102 Tcl_AppendResult(interp, "failed to create the thread", 0);
1103 return TCL_ERROR;
1104 }
1105 pthread_detach(x);
1106 return TCL_OK;
1107}
1108
1109/*
1110** sqlite3async_wait
1111**
1112** Wait for the current writer thread to terminate.
1113**
1114** If the current writer thread is set to run forever then this
1115** command would block forever. To prevent that, an error is returned.
1116*/
1117static int testAsyncWait(
1118 void * clientData,
1119 Tcl_Interp *interp,
1120 int objc,
1121 Tcl_Obj *CONST objv[]
1122){
1123 if( async.writerHaltNow==0 && async.writerHaltWhenIdle==0 ){
1124 Tcl_AppendResult(interp, "would block forever", (char*)0);
1125 return TCL_ERROR;
1126 }
drhfc8748a2006-02-13 14:49:38 +00001127 TRACE(("WAIT\n"));
drh23669402006-01-09 17:29:52 +00001128 pthread_cond_broadcast(&async.queueSignal);
1129 pthread_mutex_lock(&async.writerMutex);
1130 pthread_mutex_unlock(&async.writerMutex);
1131 return TCL_OK;
1132}
1133
1134
1135#endif /* OS_UNIX and THREADSAFE and defined(SQLITE_ENABLE_REDEF_IO) */
1136
1137/*
1138** This routine registers the custom TCL commands defined in this
1139** module. This should be the only procedure visible from outside
1140** of this module.
1141*/
1142int Sqlitetestasync_Init(Tcl_Interp *interp){
1143#if OS_UNIX && THREADSAFE && defined(SQLITE_ENABLE_REDEF_IO)
1144 Tcl_CreateObjCommand(interp,"sqlite3async_enable",testAsyncEnable,0,0);
1145 Tcl_CreateObjCommand(interp,"sqlite3async_halt",testAsyncHalt,0,0);
1146 Tcl_CreateObjCommand(interp,"sqlite3async_delay",testAsyncDelay,0,0);
1147 Tcl_CreateObjCommand(interp,"sqlite3async_start",testAsyncStart,0,0);
1148 Tcl_CreateObjCommand(interp,"sqlite3async_wait",testAsyncWait,0,0);
1149#endif /* OS_UNIX and THREADSAFE and defined(SQLITE_ENABLE_REDEF_IO) */
1150 return TCL_OK;
1151}