/*
    compressloop-distributed, creates a cloop version 2 compressed image
    Copyright (C) 2004  James Cameron <quozl@us.netrek.org>

    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation; either version 2 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with this program; if not, write to the Free Software
    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA


    Derived from compressloop.c (c) 1999 by Paul `Rusty' Russell, GPL,
    with contributions in 2000 and 2001 by Klaus Knopper, and in 2002
    by Valentijn Sessink.


    DESIGN

    master {
	start slaves
	read input file and feed uncompressed blocks to slaves
	accept compressed blocks from slaves
	merge back into order and write to output file
    }

    slave {
        listen for connections
        while(true) {
	    accept connection
	    while(connected) { read blocks, compress, write blocks }
	}
    }

    TODO

    - profiling, to ensure we really are spending all of the time
      waiting in the event loop where we should (otherwise parallism
      must suffer).
    - in test network, one slave is kept 100% busy but another isn't, why?
    - a design fault, without increasing TCP socket buffers from the
      default, processing halts because of an inter-slave channel
      deadlock; the master is waiting for a write() to complete to the
      slave's connection, yet the slave is waiting for the master to
      open the window by reading what it has already sent; this might
      be fixed by pacing.
    - gracefully degrade if a slave is unavailable or fails during run.
    - cleanup stderr output and make verbosity consistent.
    - accept compression level and pass to slaves (protocol change).
    - add protocol version number for backward compatibility.

*/

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <asm/byteorder.h>
#include <errno.h>
#include <glib.h>

/*--------------------------*/
#ifndef _COMPRESSED_LOOP_H
#define _COMPRESSED_LOOP_H

#define CLOOP_HEADROOM 128

struct cloop_head
{
        char preamble[CLOOP_HEADROOM];
        u_int32_t block_size;
        u_int32_t num_blocks;
};

#endif /*_COMPRESSED_LOOP_H*/
/*--------------------------*/

#define MAX_KMALLOC_SIZE 2L<<17

#define CLOOP_PREAMBLE "#!/bin/sh\n" "#V2.0 Format\n" "insmod cloop.o file=$0 && mount -r -t iso9660 /dev/cloop $1\n" "exit $?\n"

/* defaults */
#define IMAGESIZE 2000000000
#define BLOCKSIZE 65536

#define PORT 8522
#define VERSION 20040717

/* per slave structure */
struct slave {
  char *host;			/* host name				*/
  int port;			/* port number				*/
  int sock;			/* file descriptor of network socket	*/
  GIOChannel *channel;		/* glib I/O channel number of socket	*/
  guint writable, readable;	/* glib watch numbers for events	*/
  int blocks;
};
 
/* a data block structure */
struct block {
  unsigned long number;		/* block number in input file		*/
  int size;			/* size of the data in bytes		*/
  char *data;			/* malloc'd pointer to data		*/
  /* uncompressed or compressed, depends on context */
};
	
GMainLoop *loop;	/* main processing event loop	*/
int ended;		/* input end of file was seen	*/
int head;		/* current head of queue (input block number)	*/
int tail;		/* current tail of queue (output block number)	*/

int in;			/* file descriptor of input	*/
int out;	        /* file descriptor of output	*/

unsigned long blocksize=BLOCKSIZE;	/* bytes per block		*/
unsigned int indexsize;			/* size of index in bytes	*/
unsigned long guessedblocks;		/* guessed count of blocks	*/
unsigned long countedblocks=0;		/* count of blocks written	*/
loff_t *pointerblock;			/* in-memory block index	*/
int pointercurrent;			/* current index pointer number */
unsigned long position;			/* offset to current block	*/
unsigned long imagesize=IMAGESIZE;	/* input image size		*/
unsigned int forceimagesize=0;
unsigned int verbose=0;
unsigned int determinedimagesize=0;

GHashTable *queue;	/* block merge queue, hashed by block number	*/


/* free a block structure */
static void block_free(struct block *block) {
  if (block->data != NULL) g_free(block->data);
  g_free(block);
}


/* open the input file and calculate sizes */
static void input_open(char *name) {
  struct stat filestat;

  in = strcmp(name, "-") == 0 ? dup(fileno(stdin)) : open(name, O_RDONLY);
  if (in < 0) { perror("input_open: open"); exit(1); }
  if ((fstat(in, &filestat) < 0)||(filestat.st_size == 0)||(forceimagesize)) {
    guessedblocks = imagesize / blocksize;
  } else {
    imagesize = filestat.st_size;
    determinedimagesize = 1;
    if (verbose) fprintf(stderr, "Input file size determined: %lli\n", 
			 (long long) filestat.st_size);
  }

  guessedblocks = (imagesize / blocksize) + 1;
	
  indexsize = sizeof(loff_t) * (guessedblocks + 1);

  if ((pointerblock = malloc((size_t)indexsize)) == NULL) {
    fprintf(stderr, "Sorry, could not allocate pointerblock\n");
    exit(1);
  }
}

/* read an input block, return NULL on EOF */
static struct block *input_get() {
  int total = 0;

  struct block *block = g_new(struct block, 1);
  g_assert (block != NULL);
  block->data = g_new0(char, blocksize);
  g_assert (block->data != NULL);

  while (total < blocksize) {
    ssize_t bytes = read(in, block->data + total, blocksize - total);
    if (bytes < 0) { perror("read"); exit(1); }
    if (bytes == 0) { 
      if (total == 0) { block_free(block); return NULL; }
      fprintf(stderr, " input_get: partial, %d bytes of %lu, padding up\n", 
	      total, blocksize);
      break;
    }
    total += bytes;
  }

  block->size = blocksize;
  block->number = head++;
  if (verbose > 3) fprintf(stderr, "  input_get: block %lu size %d\n", 
			   block->number, block->size);

  return block;
}

/* initialise the merge queue */
static void queue_init() {
  queue = g_hash_table_new(g_int_hash, g_int_equal);
}

/* put a block into the merge queue */
static void queue_put(struct block *block) {
  if (verbose > 3) fprintf(stderr, "  queue_put: block %lu size %d\n", 
			   block->number, block->size);
  g_hash_table_insert(queue, &block->number, block);
}

/* get a block from the merge queue, and remove it */
static struct block *queue_get(int number) {
  struct block *block;
  block = g_hash_table_lookup(queue, &number);
  if (block == NULL) {
    if (verbose > 3) fprintf(stderr, "  queue_get: block %d not here\n", 
			     number);
    return NULL;
  }
  g_hash_table_remove(queue, &number);
  if (verbose > 3) fprintf(stderr, "  queue_get: block %lu size %d\n", 
			   block->number, block->size);
  return block;
}


/* start the output file */
static void output_open(char *name) {

  /* create file */
  out = open(name, O_WRONLY|O_TRUNC|O_CREAT,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
  if (out < 0) { perror("output_open: open"); exit(1); }

  /* set position of first block */
  position = sizeof(struct cloop_head) + indexsize;

  /* seek beyond index */
  if (lseek(out, (off_t) position, SEEK_CUR) < 0) {
    perror("output_open: lseek"); exit(1);
  }
  pointercurrent = 0;
}

/* put a block into the output (caller to be strict in ordering) */
static void output_put(struct block *block) {
  /* fill in the index */
  pointerblock[pointercurrent++] = __cpu_to_be64(position);
  position += block->size;
  /* write to the file */
  if (write(out, block->data, block->size) != block->size) {
    perror("output_put: write"); exit(1);
  }
  if (verbose > 3) 
    fprintf(stderr, "  output_put: block %lu size %d offset %lu\n", 
	    block->number, block->size, position - block->size);
  else
    if (verbose > 1) fprintf(stderr, ".");

  countedblocks++;
  if (countedblocks > guessedblocks) {
    fprintf(stderr, "You guessed the number of blocks wrong.  Failing.\n");
    exit(1);
  }
}

/* finish the output file */
static void output_close() {
  struct cloop_head header;

  /* make note of last block index */
  pointerblock[pointercurrent]=__cpu_to_be64(position);
  if (verbose) {
    fprintf(stderr, "output_close: counted blocks: %lli (guessed %lli)\n", 
	    (long long) countedblocks, (long long) guessedblocks);
  }

  /* seek to the header position */
  if (lseek(out, (off_t) 0, SEEK_SET) < 0) {
    perror("output_close: header lseek"); exit(1);
  }

  /* construct and write out the header */
  memset(header.preamble, 0, sizeof(header.preamble));
  memcpy(header.preamble, CLOOP_PREAMBLE, sizeof(CLOOP_PREAMBLE));
  header.block_size = htonl(blocksize);
  header.num_blocks = htonl(countedblocks);
  if (write(out, &header, sizeof(header)) != sizeof(header)) {
    perror("output_close: header write"); exit(1);
  }

  /* write out the index */
  if (write(out, pointerblock, indexsize) != indexsize) {
    perror("output_close: index write"); exit(1);
  }

  /* close file */
  close(out);
}

/* persistent form of write */
static GIOError g_io_channel_write_persist(GIOChannel *channel, gchar *buf, 
					  guint count, guint *bytes_written) {
  guint bytes_this;
  GIOError gioerr = G_IO_ERROR_NONE;

  *bytes_written = 0;
  while (*bytes_written < count) {
    gioerr = g_io_channel_write(channel, buf+(*bytes_written), 
			       count-(*bytes_written), &bytes_this);
    *bytes_written += bytes_this;
    if (gioerr != G_IO_ERROR_NONE) return gioerr;
  }
  return gioerr;
}


/* send a block to a slave */
static void slave_put(struct slave *slave, struct block *block) {
  GIOError gioerr;
  guint bytes_written;
  unsigned long number;
  int size;

  /* send block number */
  number = htonl(block->number);
  gioerr = g_io_channel_write_persist(slave->channel, (char *) &number, 
			      sizeof(number), &bytes_written);
  g_assert (bytes_written == sizeof(number));
  g_assert (gioerr == G_IO_ERROR_NONE);

  /* send block size */
  size = htonl(block->size);
  gioerr = g_io_channel_write_persist(slave->channel, (char *) &size, 
			      sizeof(size), &bytes_written);
  g_assert (bytes_written == sizeof(size));
  g_assert (gioerr == G_IO_ERROR_NONE);

  /* send block data */
  gioerr = g_io_channel_write_persist(slave->channel, block->data, 
			      block->size, &bytes_written);
  g_assert (bytes_written == block->size);
  g_assert (gioerr == G_IO_ERROR_NONE);

  if (verbose > 3) fprintf(stderr, "  slave_put: %s:%d: block %lu size %d\n", 
			   slave->host, slave->port, 
			   block->number, block->size);
}

/* persistent form of read */
static GIOError g_io_channel_read_persist(GIOChannel *channel, gchar *buf, 
					  guint count, guint *bytes_read) {
  guint bytes_this;
  GIOError gioerr = G_IO_ERROR_NONE;

  *bytes_read = 0;
  while (*bytes_read < count) {
    gioerr = g_io_channel_read(channel, buf+(*bytes_read), 
			       count-(*bytes_read), &bytes_this);
    *bytes_read += bytes_this;
    if (gioerr != G_IO_ERROR_NONE) return gioerr;
  }
  return gioerr;
}

/* get a block returned by a slave */
static struct block *slave_get(struct slave *slave) {
  GIOError gioerr;
  guint bytes_read;
  unsigned long number;
  int size;
  struct block *block = g_new(struct block, 1);

  g_assert (block != NULL);

  /* receive block number */
  gioerr = g_io_channel_read_persist(slave->channel, (char *) &number, 
				     sizeof(number), &bytes_read);
  g_assert (gioerr == G_IO_ERROR_NONE);
  g_assert (bytes_read == sizeof(number));
  block->number = ntohl(number);

  /* receive block size */
  gioerr = g_io_channel_read_persist(slave->channel, (char *) &size, 
				     sizeof(size), &bytes_read);
  g_assert (gioerr == G_IO_ERROR_NONE);
  g_assert (bytes_read == sizeof(size));
  block->size = ntohl(size);

  /* allocate block data */
  g_assert ((block->size < (65536*2)));
  block->data = malloc(block->size);

  /* receive block data */
  gioerr = g_io_channel_read_persist(slave->channel, block->data, 
				     block->size, &bytes_read);
  g_assert (gioerr == G_IO_ERROR_NONE);
  g_assert (bytes_read == block->size);

  /* return the block, caller to free */
  if (verbose > 3) fprintf(stderr, " slave_get: %s:%d: block %lu size %d\n", 
			   slave->host, slave->port, block->number, 
			   block->size);

  slave->blocks++;
  return block;
}

/* a slave compressor is ready to take data */
static gboolean slave_writable(GIOChannel *source, GIOCondition condition, 
			       gpointer data) {
  struct slave *slave = (struct slave *) data;
  struct block *block;

  if (verbose > 3) fprintf(stderr, "slave_writable: %s:%d\n", 
			   slave->host, slave->port);
  /* if we have already seen end of input, no need to be called again */
  if (ended) return FALSE;
  /* read input file block */
  block = input_get();
  /* if no more data available, note the event, no need to be called again */
  if (block == NULL) { ended++; return FALSE; }
  /* send block to slave */
  slave_put(slave, block);
  block_free(block);
  /* please call again */
  return TRUE;
}

/* a slave compressor has returned data to us */
static gboolean slave_readable(GIOChannel *source, GIOCondition condition, 
			       gpointer data) {
  struct slave *slave = (struct slave *) data;
  struct block *block;

  if (verbose > 3) fprintf(stderr, "slave_readable: %s:%d\n", 
			   slave->host, slave->port);
  /* read from slave */
  block = slave_get(slave);
  /* is it the block we are waiting for */
  if (block->number == tail) {
    /* write to file */
    output_put(block); tail++; block_free(block);
    /* write merge queue while it matches tail */
    while ((block = queue_get(tail))) {
      output_put(block); tail++; block_free(block); }
    /* check for completion */
    if ((ended) && (tail == head)) g_main_quit(loop);
  } else {
    /* these aren't the droids we're looking for, add to merge queue */
    queue_put(block);
  }
  /* please call again */
  return TRUE;
}

/* create a new slave */
static struct slave *slave_new(char *host, int port) {
  struct slave *my;
  int stat, ov;
  struct sockaddr_in addr;
  struct in_addr result;
  int version;
  GIOError gioerr;
  int bytes_written;

  my = g_new0(struct slave, 1);
  g_assert (my != NULL);

  my->host = g_strdup(host);
  my->port = port;

  /* create socket and set buffer sizes to prevent blocking */
  my->sock = socket(AF_INET, SOCK_STREAM, 0);
  if (my->sock == -1) { perror("socket"); exit(1); }

  ov = blocksize * 4;
  stat = setsockopt(my->sock, SOL_SOCKET, SO_RCVBUF, (char *) &ov, sizeof(ov));
  if (stat < 0) { perror("setsockopt (SO_RCVBUF)"); }

  ov = blocksize * 4;
  stat = setsockopt(my->sock, SOL_SOCKET, SO_SNDBUF, (char *) &ov, sizeof(ov));
  if (stat < 0) { perror("setsockopt (SO_RCVBUF)"); }

  /* resolve the host address and prepare address structure */
  addr.sin_family = AF_INET;
  if (inet_aton(host, &result)) {
    addr.sin_addr = result;
  } else {
    struct hostent *hostent;

    hostent = gethostbyname(host);
    if (hostent == NULL) { herror(host); exit(1); }

    addr.sin_addr.s_addr = * (long *) hostent->h_addr;
  }
  addr.sin_port = htons(port);

  /* connect to the slave */
  stat = connect(my->sock, (struct sockaddr *) &addr, sizeof(addr));
  if (stat != 0) { perror("connect"); exit(1); }

  /* express an interest in the slave's file descriptors */
  my->channel = g_io_channel_unix_new(my->sock);
  my->writable = g_io_add_watch(my->channel, G_IO_OUT, slave_writable, my);
  my->readable = g_io_add_watch(my->channel, G_IO_IN, slave_readable, my);

  /* send protocol version */
  version = htonl(VERSION);
  gioerr = g_io_channel_write_persist(my->channel, (char *) &version, 
			      sizeof(version), &bytes_written);
  g_assert (bytes_written == sizeof(version));
  g_assert (gioerr == G_IO_ERROR_NONE);

  if (verbose > 3) fprintf(stderr, "slave_new: %s:%d: connected\n", my->host, my->port);
  return my;
}

static void usage(char* argv) {
  fprintf(stderr, "Usage: %s [options] infile outfile slave [slave ...]\n", argv);
  fprintf(stderr, "Options:\n");
  fprintf(stderr, "\t-b blocksize\t\tset block size (multiple of 512 bytes)\n");
  fprintf(stderr, "\t-i imagesize\t\tset input image size (in bytes) if it cannot be determined\n");
  fprintf(stderr, "\t-I forced-imagesize\tforce input image size (in bytes), even if determined\n");
  fprintf(stderr, "\t-v                 \tbe more verbose (accumulates)\n");
  exit(1);
}

int main(int argc, char **argv) {
  int optionchar;
  GPtrArray *slaves;
  
  /* process command line options */
  while ((optionchar = getopt(argc, argv, "b:i:I:c:Fv")) != -1) {
    switch (optionchar) {
    case 'b':
      blocksize = atoi(optarg);
      if (blocksize == 0 || blocksize % 512 != 0) fprintf(stderr, "Illegal block size, isn't a multiple of 512.\n");
      break;
    case 'i':
      imagesize = atoi(optarg);
      break;
    case 'I':
      imagesize = atoi(optarg);
      forceimagesize = 1;
      break;
    case 'v':
      verbose++;
      break;
    default:
      fprintf(stderr, "Unknown option: %c\n", optionchar);
      usage(argv[0]);
    }
    if (verbose >= 2)
      fprintf(stderr, "Option %c with possible sub-option %s\n", optionchar, optarg);
  }
  
  if (optind == argc) { fprintf(stderr, "ERROR: no infile/outfile present\n"); exit(1); }
  if (optind == argc-1) { fprintf(stderr, "ERROR: no outfile present\n"); exit(1); }
  if (optind == argc-2) { fprintf(stderr, "ERROR: no slaves\n"); exit(1); }
  
  if (blocksize > MAX_KMALLOC_SIZE) {
    fprintf(stderr, "WARNING: Blocksize %lu may be too big for a kmalloc() (%lu max).\n", blocksize, MAX_KMALLOC_SIZE);
    sleep(2);
  }
  
  if (sizeof(CLOOP_PREAMBLE) > CLOOP_HEADROOM) {
    fprintf(stderr, "*** Preamble (%u chars) > headroom (%u)\n",
	    sizeof(CLOOP_PREAMBLE), CLOOP_HEADROOM);
    exit(1);
  }

  // open input and output files
  input_open(argv[optind]);
  output_open(argv[optind+1]);

  // create the main loop
  loop = g_main_new(FALSE);
  ended = head = tail = 0;

  queue_init();

  // create the slaves, keep them fed with blocks as their sockets
  // become writeable, and as they return the compressed result
  // enforce block ordering and write to the output file.
  slaves = g_ptr_array_new();
    
  for (optind += 2;optind < argc;optind++) {
    char **split;
    int port;
    struct slave *slave;
    
    split = g_strsplit(argv[optind], ":", 2);
    
    if (split[1] != NULL) port = atoi(split[1]); else port = PORT;
    slave = slave_new(split[0], port);
    g_strfreev(split);
    g_ptr_array_add(slaves, (gpointer) slave);
  }

  // main loop will be terminated when output file is completed
  g_main_run(loop);

  output_close();

  /* report slave processing statistics */
  {
    struct slave *slave;
    int i, total=0;
    
    for(i=0;(slave = g_ptr_array_index(slaves, i));i++) {
      total += slave->blocks;
    }
    for(i=0;(slave = g_ptr_array_index(slaves, i));i++) {
      fprintf(stderr, "%s processed %d blocks (%d%%)\n", slave->host, slave->blocks, slave->blocks * 100 / total);
    }
  }
  
  g_ptr_array_free(slaves, TRUE);
  
  return 0;
}

// timings, output to /dev/null
//   distributed 1718284288 1.7Gb to 700MHz Celeron localhost, 500MHz Pentium III, 350MHz Pentium II took 6m46s.
//   normal 1.7Gb to 700MHz Celeron took 13m56s.

// watch -d --interval=1 "(netstat -an|head -2|tail -1;netstat -an|grep 8522)"
// 127.0.0.1 processed 7253 blocks
// 10.0.0.5 processed 10611 blocks
// 10.0.0.16 processed 8355 blocks

// 5m 15s
// emma processed 8167 blocks
// quozl processed 7125 blocks
// easy processed 5958 blocks
// lenny processed 4969 blocks

// 7m32.517s
// quozl processed 12533 blocks
// emma processed 13686 blocks

// 6m41s
// emma processed 15104 blocks (57%)
// quozl processed 11115 blocks (42%)

// 5m59.053s (to file, not /dev/null like other tests)
// emma processed 8617 blocks (32%)
// quozl processed 7029 blocks (26%)
// easy processed 5871 blocks (22%)
// lenny processed 4702 blocks (17%)
