|
(&aof,&key) == 0) goto werr;
if (rioWriteBulkObject(&aof,o) == 0) goto werr;
} else if (o->type == REDIS_LIST) {
if (rewriteListObject(&aof,&key,o) == 0) goto werr;
} else if (o->type == REDIS_SET) {
if (rewriteSetObject(&aof,&key,o) == 0) goto werr;
} else if (o->type == REDIS_ZSET) {
if (rewriteSortedSetObject(&aof,&key,o) == 0) goto werr;
} else if (o->type == REDIS_HASH) {
if (rewriteHashObject(&aof,&key,o) == 0) goto werr;
} else {
redisPanic("Unknown object type");
}
/* Save the expire time */
if (expiretime != -1) {
char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr;
}
}
dictReleaseIterator(di);
}
/* Make sure data will not remain on the OS's output buffers */
fflush(fp);
aof_fsync(fileno(fp));
fclose(fp);
/* Use RENAME to make sure the DB file is changed atomically only
* if the generate DB file is ok. */
if (rename(tmpfile,filename) == -1) {
redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
unlink(tmpfile);
return REDIS_ERR;
}
redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
return REDIS_OK;
werr:
fclose(fp);
unlink(tmpfile);
redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
if (di) dictReleaseIterator(di);
return REDIS_ERR;
}
/* Write the append only file buffer on disk.
*
* Since we are required to write the AOF before replying to the client,
* and the only way the client socket can get a write is entering when the
* the event loop, we accumulate all the AOF writes in a memory
* buffer and write it on disk using this function just before entering
* the event loop again.
*
* About the 'force' argument:
*
* When the fsync policy is set to 'everysec' we may delay the flush if there
* is still an fsync() going on in the background thread, since for instance
* on Linux write(2) will be blocked by the background fsync anyway.
* When this happens we remember that there is some aof buffer to be
* flushed ASAP, and will try to do that in the serverCron() function.
*
* However if force is set to 1 we'll write regardless of the background
* fsync. */
void flushAppendOnlyFile(int force) {
ssize_t nwritten;
int sync_in_progress = 0;
if (sdslen(server.aof_buf) == 0) return;
if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;
//判定是否该开始将server.aof_buff中缓存的命令flush到server.aof_fd文件的写缓冲中
if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
/* With this append fsync policy we do background fsyncing.
* If the fsync is still in progress we can try to delay
* the write for a couple of seconds. */
if (sync_in_progress) {
if (server.aof_flush_postponed_start == 0) {
/* No previous write postponinig, remember that we are
* postponing the flush and return. */
server.aof_flush_postponed_start = server.unixtime;
return;
} else if (server.unixtime - server.aof_flush_postponed_start < 2) {
/* We were already waiting for fsync to finish, but for less
* than two seconds this is still ok. Postpone again. */
return;
}
/* Otherwise fall trough, and go write since we can't wait
* over two seconds. */
server.aof_delayed_fsync++;
redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy ). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
}
}
/* If you are following this code path, then we are going to write so
* set reset the postponed flush sentinel to zero. */
server.aof_flush_postponed_start = 0;
/* We want to perform a single write. This should be guaranteed atomic
* at least if the filesystem we are writing is a real physical one.
* While this will save us against the server being killed I don't think
* there is much to do about the whole server stopping for power problems
* or alike */
//将redis最近执行的一些命令(存于server.aof_buf)写入文件(server.aof_fd)
//注意,写入文件并不能保证马上写入磁盘,因为这是带缓冲的写。关于何时将
//文件写缓冲中的命令fync到磁盘,这就要看用户的设置:(见下文)
nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
if (nwritten != (signed)sdslen(server.aof_buf)) {
/* Ooops, we are in troubles. The best thing to do for now is
* aborting instead of giving the illusion that everything is
* working as expected. */
...
exit(1);
}
server.aof_current_size += nwritten;
/* Re-use AOF buffer when it is small enough. The maximum comes from the
* arena size of 4k minus some overhead (but is otherwise arbitrary). */
if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
sdsclear(server.aof_buf);
} else {
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
}
//aof_no_fsync_on_rewrite : 该标志位表示当有aof或rdb子进程时,不进行fsync操作
if (server.aof_no_fsync_on_rewrite &&
(server.aof_child_pid != -1 || server.rdb_child_pid != -1))
return;
//fsync...
//每个操作,调用fync将命令持久化 [1]
//间隔1秒,调用fync将aof_buf持久化 [2]
//从不调用fync,由系统自行安排时机(fd的写缓冲区满了)[3]
//【1】
//每个操作都需要将文件缓冲区的写 buff sync到磁盘。从而保证每个redis操作在
//被redis执行后,都能马上持久化,安全性很高,就是磁盘写的系统开销有点大大
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* aof_fsync is defined as fdatasync() for Linux in order to avoid
* flushing metadata. */
aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */
server.aof_last_fsync = server.unixtime;
}
//【2】
//每隔1s将文件缓冲区的写缓冲区sync到磁盘
else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync)) {
if (!sync_in_progress) aof_background_fsync(server.aof_fd);
server.aof_last_fsync = server.unixtime;
}
//【3】
//else fd的写缓冲满后会由系统安排执行(听天由命)
}
|