Index: b/Makefile.am =================================================================== --- a/Makefile.am +++ b/Makefile.am @@ -72,7 +72,8 @@ AM_CFLAGS=\ common_s2disk_libs=\ $(LZO_LIBS) \ - $(LIBGCRYPT_LIBS) + $(LIBGCRYPT_LIBS) \ + $(PTHREAD_LIBS) common_s2ram_libs= if ENABLE_SPLASHY Index: b/configure.ac =================================================================== --- a/configure.ac +++ b/configure.ac @@ -80,6 +80,12 @@ AC_ARG_WITH( [devdir="${withval}"], [devdir="/dev"] ) +AC_ARG_ENABLE( + [threads], + [AC_HELP_STRING([--enable-threads], [enable threads support])], + , + [enable_threads="no"] +) AC_ARG_WITH( [initramfsdir], [AC_HELP_STRING([--with-initramfsdir=DIR], [put initramfs binaries in this directory, default LIBDIR/suspend])], @@ -242,6 +248,20 @@ if test "${enable_fbsplash}" = "yes"; th ) fi +if test "${enable_threads}" = "yes"; then + CONFIG_FEATURES="${CONFIG_FEATURES} threads" + AC_DEFINE([CONFIG_THREADS], [1], [Define if threads enabled]) + if test -z "${PTHREAD_LIBS}"; then + AC_ARG_VAR([PTHREAD_LIBS], [linker flags for threads]) + AC_CHECK_LIB( + [pthread], + [pthread_create], + [PTHREAD_LIBS="-lpthread"], + [AC_MSG_ERROR([Required pthread library not found])] + ) + fi +fi + AC_DEFINE_UNQUOTED([CONFIG_FEATURES], ["${CONFIG_FEATURES## }"], [String representation of available features]) AC_HEADER_STDC Index: b/suspend.c =================================================================== --- a/suspend.c +++ b/suspend.c @@ -32,6 +32,9 @@ #include <errno.h> #include <signal.h> #include <termios.h> +#ifdef CONFIG_THREADS +#include <pthread.h> +#endif #ifdef CONFIG_COMPRESS #include <lzo/lzo1x.h> #endif @@ -91,6 +94,11 @@ static enum { } shutdown_method = SHUTDOWN_METHOD_PLATFORM; static int resume_pause; static char verify_image; +#ifdef CONFIG_THREADS +static char use_threads; +#else +#define use_threads 0 +#endif static int suspend_swappiness = SUSPEND_SWAPPINESS; static struct vt_mode orig_vtm; @@ -188,6 +196,13 @@ static struct config_par parameters[] = .fmt = "%c", .ptr = &verify_image, }, +#ifdef CONFIG_THREADS + { + .name = "threads", + .fmt = "%c", + .ptr = &use_threads, + }, +#endif { .name = NULL, .fmt = NULL, @@ -502,7 +517,7 @@ static int write_page(int fd, void *buf, * * @lzo_work_buffer: Work buffer used for compression. * - * @encrypt_buffer: Buffer for storing encrypted pages (page_size bytes). + * @encrypt_buffer: Buffer for storing encrypted data (page_size bytes). * * @encrypt_ptr: Address to store the next encrypted page at. */ @@ -531,11 +546,11 @@ struct swap_writer { */ static void free_swap_writer(struct swap_writer *handle) { - if (do_compress) { - freemem(handle->lzo_work_buffer); + if (handle->write_buffer != handle->buffer) freemem(handle->write_buffer); - } - if (do_encrypt) + if (do_compress) + freemem(handle->lzo_work_buffer); + if (handle->encrypt_buffer) freemem(handle->encrypt_buffer); freemem(handle->buffer); freemem(handle->extents); @@ -553,23 +568,35 @@ static void free_swap_writer(struct swap static int init_swap_writer(struct swap_writer *handle, int dev, int fd) { loff_t offset; + unsigned int write_buf_size = 0; handle->extents = getmem(page_size); handle->buffer = getmem(buffer_size); handle->page_ptr = handle->buffer; - handle->write_buffer = handle->buffer; if (do_encrypt) { handle->encrypt_buffer = getmem(encrypt_buf_size); handle->encrypt_ptr = handle->encrypt_buffer; + } else { + handle->encrypt_buffer = NULL; } if (do_compress) { - handle->write_buffer = getmem(compress_buf_size); handle->lzo_work_buffer = getmem(LZO1X_1_MEM_COMPRESS); + write_buf_size = compress_buf_size; + if (use_threads) + write_buf_size += + (WRITE_BUFFERS - 1) * compress_buf_size; } + if (write_buf_size > 0) + handle->write_buffer = getmem(write_buf_size); + else if (use_threads) + handle->write_buffer = getmem(buffer_size * WRITE_BUFFERS); + else + handle->write_buffer = handle->buffer; + handle->dev = dev; handle->fd = fd; handle->written_data = 0; @@ -661,7 +688,6 @@ static int save_extents(struct swap_writ static loff_t next_swap_page(struct swap_writer *handle) { struct extent ext; - int error; handle->cur_offset += page_size; if (handle->cur_offset < handle->cur_extent->end) @@ -690,16 +716,466 @@ static loff_t next_swap_page(struct swap } /** - * encrypt_and_save_page - encrypt a page of data and write it to the swap + * save_page - save one page of data to the swap + * @handle: Pointer to the structure containing information about + * the swap. + * @src: Pointer to the data. */ -static int encrypt_and_save_page(struct swap_writer *handle, void *src) +static int save_page(struct swap_writer *handle, void *src) { - int error; loff_t offset; + int error; + + offset = next_swap_page(handle); + if (!offset) + return -ENOSPC; + error = write_page(handle->fd, src, offset); + if (error) + return error; + handle->swap_needed -= page_size; + handle->written_data += page_size; + return 0; +} + +#ifdef CONFIG_THREADS +/* + * If threads are used for saving the image with compression and encryption, + * there are three of them. + * + * The main one reads image pages from the kernel and puts them into a work + * buffer. When the work buffer is full, it gets compressed, but that's not an + * in-place compression, so the result has to be stored somewhere else. There + * are four so-called "write" buffers for that and the first empty "write" + * buffer is used as the target. If all of the "write" buffers are full, the + * thread has to wait (see the rules below). Otherwise, after placing the + * (compressed) contents of the work buffer into a "write" buffer, the main + * thread regards the work buffer as empty and starts to read more image pages + * from the kernel. + * + * The second thread (call it the "move" thread) encrypts the contents of the + * "write" buffers, one buffer at a time. It really encrypts individual pages + * and the encryption is not in-place, too. The encrypted pages of data are + * placed in yet another buffer (call it the "encrypt" buffer) until it's full, + * in which case the "move" thread has to wait. Of course, it also has to wait + * for data from the main thread if all of the "write" buffers are empty. + * After encrypting an entire "write" buffer, the "move" thread progresses to + * the next "write" buffer, in a round-robin manner. + * + * The synchronization between the main thread and the "move" thread is done + * with the help of two index variables, move_start and move_end. The rule + * is that: + * (1) the main thread can only put data into write_buffers[move_start], + * (2) after putting data into write_buffers[move_start], the main thread + * increases move_start, modulo the number of "write" buffers, but + * move_start cannot be modified as long as the _next_ "write" buffer is + * write_buffers[move_end] (the thread has to wait if that happens), + * (3) the "move" thread can only read data from write_buffers[move_end] and + * only if move_end != move_start (it has to wait if that's not the case), + * (4) after reading data from write_buffers[move_end], the "move" thread + * increases move_end, modulo the number of "write" buffers. + * This way, move_end always "follows" move_start and the threads don't access + * the same buffer at any time. + * + * The third thread (call it the "save" thread) reads (encrypted) pages of data + * from the "encrypt" buffer and writes them out to the swap. This is done if + * there are some pages to write in the "encrypt" buffer, otherwise the "save" + * thread has to wait for the "move" thread to put more pages in there. + * + * The synchronization between the "move" thread and the "save" thread is done + * with the help of two pointers, save_start and save_end, where save_start + * points to the first empty page and save_end points to the last data page + * that hasn't been written out yet. Thus, the rule is: + * (1) the "move" thread can only put data into the page pointed to by + * save_start, + * (2) after putting data into the page pointed to by save_start, the "move" + * thread increases save_start, modulo the number of pages in the buffer, + * provided that the _next_ page is not the one pointed to by save_end (it + * has to wait if that happens), + * (3) the "save" thread can only read from the page pointed to by save_end, + * as long as save_end != save_start (it has to wait if the two pointers + * are equal), + * (4) after writing data from the page pointed to by save_end, the "save" + * thread increases save_end, modulo the number of pages in the buffer. + * IOW, the "encrypt" buffer is handled as a typical circular buffer with one + * producer (the "move" thread) and one consumer (the "save" thread). + * + * If encryption is not used, the "save" thread is not started and the "move" + * thread writes data to the swap directly out of the "write" buffers. + */ + +static int save_ret; +static pthread_mutex_t finish_mutex; +static pthread_cond_t finish_cond; + +static char *encrypt_buf; +static char *save_start, *save_end; +static pthread_mutex_t save_mutex; +static pthread_cond_t save_cond; +static pthread_t save_th; + +struct write_buffer { + ssize_t size; + void *start; +}; + +static struct write_buffer write_buffers[WRITE_BUFFERS]; +static int move_start, move_end; +static pthread_mutex_t move_mutex; +static pthread_cond_t move_cond; +static pthread_t move_th; + +#define FORCE_EXIT 1 + +static char *save_inc(char *ptr) +{ + return encrypt_buf + + (((ptr - encrypt_buf) + page_size) % encrypt_buf_size); +} + +static int move_inc(int index) +{ + return (index + 1) % WRITE_BUFFERS; +} + +static void wait_for_finish(void) +{ + pthread_mutex_lock(&finish_mutex); + while((save_end != save_start || move_start != move_end) && !save_ret) + pthread_cond_wait(&finish_cond, &finish_mutex); + pthread_mutex_unlock(&finish_mutex); +} + +static void *save_thread(void *arg) +{ + struct swap_writer *handle = arg; + int error = 0; + + for (;;) { + /* Wait until there is a buffer ready for processing. */ + pthread_mutex_lock(&save_mutex); + while(save_end == save_start && !save_ret) + pthread_cond_wait(&save_cond, &save_mutex); + pthread_mutex_unlock(&save_mutex); + + if (save_ret) + return NULL; + + error = save_page(handle, save_end); + if (error) { + pthread_mutex_lock(&finish_mutex); + if (!save_ret) + save_ret = error; + pthread_mutex_unlock(&finish_mutex); + pthread_cond_signal(&move_cond); + pthread_cond_signal(&save_cond); + pthread_cond_signal(&finish_cond); + return NULL; + } + + /* Go to the next page */ + pthread_mutex_lock(&finish_mutex); + pthread_mutex_lock(&save_mutex); + save_end = save_inc(save_end); + pthread_mutex_unlock(&save_mutex); + pthread_mutex_unlock(&finish_mutex); + + pthread_cond_signal(&save_cond); + pthread_cond_signal(&finish_cond); + } + + return NULL; +} #ifdef CONFIG_ENCRYPT - if (do_encrypt) { + +static void encrypt_and_save_buffer(void) +{ + char *src; + ssize_t buf_size, moved_size; + + /* + * The buffer to process is at write_buffers[move_end].start and the + * size of it is write_buffers[move_end].size . + */ + src = write_buffers[move_end].start; + buf_size = write_buffers[move_end].size; + moved_size = 0; + do { + int error; + void *next_start; + + /* Encrypt page_size of data. */ error = gcry_cipher_encrypt(cipher_handle, + save_start, page_size, + src, page_size); + if (error) { + pthread_mutex_lock(&finish_mutex); + if (!save_ret) + save_ret = error; + pthread_mutex_unlock(&finish_mutex); + pthread_cond_signal(&move_cond); + pthread_cond_signal(&save_cond); + pthread_cond_signal(&finish_cond); + break; + } + + moved_size += page_size; + src += page_size; + + pthread_mutex_lock(&save_mutex); + next_start = save_inc(save_start); + while (next_start == save_end && !save_ret) + pthread_cond_wait(&save_cond, &save_mutex); + save_start = next_start; + pthread_mutex_unlock(&save_mutex); + + pthread_cond_signal(&save_cond); + } while (moved_size < buf_size && !save_ret); +} + +#else /* !CONFIG_ENCRYPT */ + +static inline void encrypt_and_save_buffer(void) {} + +#endif /* !CONFIG_ENCRYPT */ + +static void save_buffer(struct swap_writer *handle) +{ + void *src; + ssize_t size; + + /* + * The buffer to process is at write_buffers[move_end].start and the + * size of it is write_buffers[move_end].size . + */ + src = write_buffers[move_end].start; + size = write_buffers[move_end].size; + while (size > 0) { + int error = save_page(handle, src); + if (error) { + pthread_mutex_lock(&finish_mutex); + if (!save_ret) + save_ret = error; + pthread_mutex_unlock(&finish_mutex); + pthread_cond_signal(&move_cond); + pthread_cond_signal(&finish_cond); + break; + } + src += page_size; + size -= page_size; + } +} + +static void *move_thread(void *arg) +{ + struct swap_writer *handle = arg; + + for (;;) { + /* Wait until there is a buffer ready for processing. */ + pthread_mutex_lock(&move_mutex); + while(move_end == move_start && !save_ret) + pthread_cond_wait(&move_cond, &move_mutex); + pthread_mutex_unlock(&move_mutex); + + if (save_ret) + break; + + if (do_encrypt) + encrypt_and_save_buffer(); + else + save_buffer(handle); + + if (save_ret) + break; + + /* Tell the reader thread that we have processed the buffer */ + pthread_mutex_lock(&finish_mutex); + pthread_mutex_lock(&move_mutex); + move_end = move_inc(move_end); + pthread_mutex_unlock(&move_mutex); + pthread_mutex_unlock(&finish_mutex); + + pthread_cond_signal(&move_cond); + pthread_cond_signal(&finish_cond); + } + + return NULL; +} + +static inline void *current_write_buffer(void) +{ + return write_buffers[move_start].start; +} + +static int prepare_next_write_buffer(ssize_t size) +{ + int next_start; + + /* Move to the next buffer and signal that the current one is ready*/ + write_buffers[move_start].size = size; + + pthread_mutex_lock(&move_mutex); + next_start = move_inc(move_start); + while (next_start == move_end && !save_ret) + pthread_cond_wait(&move_cond, &move_mutex); + move_start = next_start; + pthread_mutex_unlock(&move_mutex); + + pthread_cond_signal(&move_cond); + + return save_ret; +} + +static void start_threads(struct swap_writer *handle) +{ + int error; + unsigned int write_buf_size; + char *write_buf; + int j; + + encrypt_buf = handle->encrypt_buffer; + save_start = encrypt_buf; + save_end = save_start; + + write_buf_size = do_compress ? compress_buf_size : buffer_size; + write_buf = handle->write_buffer; + for (j = 0; j < WRITE_BUFFERS; j++) { + write_buffers[j].start = write_buf; + write_buf += write_buf_size; + } + move_start = 0; + move_end = move_start; + + if (do_encrypt) { + error = pthread_mutex_init(&save_mutex, NULL); + if (error) { + perror("pthread_mutex_init() failed:"); + goto Error_exit; + } + error = pthread_cond_init(&save_cond, NULL); + if (error) { + perror("pthread_cond_init() failed:"); + goto Destroy_save_mutex; + } + } + + error = pthread_mutex_init(&move_mutex, NULL); + if (error) { + perror("pthread_mutex_init() failed:"); + goto Destroy_save_cond; + } + error = pthread_cond_init(&move_cond, NULL); + if (error) { + perror("pthread_cond_init() failed:"); + goto Destroy_move_mutex; + } + + if (do_encrypt) { + error = pthread_create(&save_th, NULL, save_thread, handle); + if (error) { + perror("pthread_create() failed:"); + goto Destroy_move_cond; + } + } + + error = pthread_create(&move_th, NULL, move_thread, handle); + if (error) { + perror("pthread_create() failed:"); + goto Stop_save_thread; + } + + error = pthread_mutex_init(&finish_mutex, NULL); + if (error) { + perror("pthread_mutex_init() failed:"); + goto Stop_move_thread; + } + error = pthread_cond_init(&finish_cond, NULL); + if (error) { + perror("pthread_cond_init() failed:"); + goto Destroy_finish_mutex; + } + + return; + + Destroy_finish_mutex: + pthread_mutex_destroy(&finish_mutex); + + Stop_move_thread: + save_ret = FORCE_EXIT; + pthread_cond_signal(&move_cond); + pthread_join(move_th, NULL); + + Stop_save_thread: + if (do_encrypt) { + save_ret = FORCE_EXIT; + pthread_cond_signal(&save_cond); + pthread_join(save_th, NULL); + } + + Destroy_move_cond: + pthread_cond_destroy(&move_cond); + Destroy_move_mutex: + pthread_mutex_destroy(&move_mutex); + + Destroy_save_cond: + if (do_encrypt) + pthread_cond_destroy(&save_cond); + Destroy_save_mutex: + if (do_encrypt) + pthread_mutex_destroy(&save_mutex); + + Error_exit: + use_threads = 0; +} + +static void stop_threads(void) +{ + pthread_mutex_lock(&finish_mutex); + if (!save_ret) + save_ret = FORCE_EXIT; + pthread_mutex_unlock(&finish_mutex); + + pthread_cond_destroy(&finish_cond); + pthread_mutex_destroy(&finish_mutex); + + pthread_cond_signal(&move_cond); + pthread_join(move_th, NULL); + if (do_encrypt) { + pthread_cond_signal(&save_cond); + pthread_join(save_th, NULL); + } + + pthread_cond_destroy(&move_cond); + pthread_mutex_destroy(&move_mutex); + + if (do_encrypt) { + pthread_cond_destroy(&save_cond); + pthread_mutex_destroy(&save_mutex); + } +} + +#else /* !CONFIG_THREADS */ + +static inline void wait_for_finish(void) {} +static inline void *current_write_buffer(void) { return NULL; } +static inline int prepare_next_write_buffer(ssize_t size) +{ + (void)size; + return -ENOSYS; +} +static inline void start_threads(struct swap_writer *handle) { (void)handle; } +static inline void stop_threads(void) {} + +#endif /* !CONFIG_THREADS */ + +/** + * encrypt_and_save_page - encrypt a page of data and write it to the swap + */ +static int encrypt_and_save_page(struct swap_writer *handle, void *src) +{ +#ifdef CONFIG_ENCRYPT + if (do_encrypt) { + int error = gcry_cipher_encrypt(cipher_handle, handle->encrypt_ptr, page_size, src, page_size); if (error) return error; @@ -710,21 +1186,13 @@ static int encrypt_and_save_page(struct handle->encrypt_ptr = handle->encrypt_buffer; } #endif - offset = next_swap_page(handle); - if (!offset) - return -ENOSPC; - error = write_page(handle->fd, src, offset); - if (error) - return error; - handle->swap_needed -= page_size; - handle->written_data += page_size; - return 0; + return save_page(handle, src); } /** - * save_buffer - save data stored in the buffer to the swap + * flush_buffer - flush data stored in the buffer to the swap */ -static int save_buffer(struct swap_writer *handle) +static int flush_buffer(struct swap_writer *handle) { ssize_t size; char *src; @@ -738,10 +1206,12 @@ static int save_buffer(struct swap_write if (compute_checksum || verify_image) md5_process_block(handle->buffer, size, &handle->ctx); + src = use_threads ? current_write_buffer() : handle->write_buffer; + /* Compress the buffer, if necessary */ -#ifdef CONFIG_COMPRESS if (do_compress) { - struct buf_block *block = handle->write_buffer; +#ifdef CONFIG_COMPRESS + struct buf_block *block = (struct buf_block *)src; lzo_uint cnt; lzo1x_1_compress(handle->buffer, size, @@ -749,15 +1219,27 @@ static int save_buffer(struct swap_write handle->lzo_work_buffer); block->size = cnt; size = cnt + sizeof(size_t); - } #endif - /* If there's no compression, handle->buffer == handle->write_buffer */ - for (src = handle->write_buffer; - size > 0; src += page_size, size -= page_size) { + } else if (use_threads) { + memcpy(src, handle->buffer, size); + } + + if (use_threads) + return prepare_next_write_buffer(size); + + /* + * If there's no compression and threads are not used, handle->buffer is + * equal to handle->write_buffer. In that case, the data are taken + * directly out of handle->buffer. + */ + while (size > 0) { error = encrypt_and_save_page(handle, src); if (error) break; + src += page_size; + size -= page_size; } + return error; } @@ -786,6 +1268,9 @@ static int save_image(struct swap_writer printf("%s: %s ", my_name, message); splash.set_caption(message); + if (use_threads) + start_threads(handle); + m = nr_pages / 100; if (!m) m = 1; @@ -840,7 +1325,7 @@ static int save_image(struct swap_writer if (handle->page_ptr - handle->buffer >= buffer_size) { /* The buffer is full, flush it */ - error = save_buffer(handle); + error = flush_buffer(handle); if (error) break; handle->page_ptr = handle->buffer; @@ -849,14 +1334,19 @@ static int save_image(struct swap_writer if (!error) { /* Flush whatever's left in the buffer and save the extents */ - error = save_buffer(handle); + error = flush_buffer(handle); + if (use_threads) + wait_for_finish(); if (!error) error = save_extents(handle, 1); if (!error) printf(" done (%u pages)\n", nr_pages); } -Exit: + Exit: + if (use_threads) + stop_threads(); + if (abort_possible) splash.restore_abort(&savedtrm); @@ -922,7 +1412,6 @@ static int write_image(int snapshot_fd, double real_size; unsigned long nr_pages = 0; int error; - int image_header_read = 0; struct timeval begin; printf("%s: System snapshot ready. Preparing to write\n", my_name); @@ -1108,6 +1597,7 @@ Save_image: Exit: freemem(header); + return error; } @@ -1811,6 +2301,11 @@ int main(int argc, char *argv[]) if (verify_image != 'y' && verify_image != 'Y') verify_image = 0; +#ifdef CONFIG_THREADS + if (use_threads != 'y' && use_threads != 'Y') + use_threads = 0; +#endif + get_page_and_buffer_sizes(); mem_size = 2 * page_size + buffer_size; @@ -1846,6 +2341,13 @@ int main(int argc, char *argv[]) } } #endif + if (use_threads) { + mem_size += (compress_buf_size > 0) ? + (WRITE_BUFFERS - 1) * compress_buf_size : + WRITE_BUFFERS * buffer_size; + if (!do_encrypt) + mem_size += WRITE_BUFFERS * buffer_size; + } ret = init_memalloc(page_size, mem_size); if (ret) { Index: b/swsusp.h =================================================================== --- a/swsusp.h +++ b/swsusp.h @@ -173,6 +173,8 @@ struct buf_block { #define GENERIC_NAME_SIZE 256 +#define WRITE_BUFFERS 4 + extern char *my_name; #ifdef CONFIG_COMPRESS Index: b/conf/suspend.conf =================================================================== --- a/conf/suspend.conf +++ b/conf/suspend.conf @@ -56,4 +56,10 @@ ## pause after resume for n seconds, so that the timing information can ## actually be read (default 0 => don't pause) #resume pause = 2 +# +## use multiple threads for saving the image. +## this option gives a huge speedup for saving the image on multicore +## machines - especially if encryption is used. It is, however, still +## considered somewhat experimental and thus defaults to "n" +# threads = n