#!/usr/bin/perl

use strict;
use warnings;

use Fcntl qw(:flock SEEK_END);
use Getopt::Long qw(GetOptionsFromArray);
use File::Path qw(make_path);
use JSON;
use IO::File;
use String::ShellQuote 'shell_quote';

my $PROGNAME = "pve-zsync";
my $CONFIG_PATH = "/var/lib/${PROGNAME}";
my $STATE = "${CONFIG_PATH}/sync_state";
my $CRONJOBS = "/etc/cron.d/$PROGNAME";
my $PATH = "/usr/sbin";
my $PVE_DIR = "/etc/pve/local";
my $QEMU_CONF = "${PVE_DIR}/qemu-server";
my $LXC_CONF = "${PVE_DIR}/lxc";
my $PROG_PATH = "$PATH/${PROGNAME}";
my $INTERVAL = 15;
my $DEBUG;

BEGIN {
    $DEBUG = 0; # change default here. not above on declaration!
    $DEBUG ||= $ENV{ZSYNC_DEBUG};
    if ($DEBUG) {
	require Data::Dumper;
	Data::Dumper->import();
    }
}

my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])";
my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)";
my $IPV6H16 = "(?:[0-9a-fA-F]{1,4})";
my $IPV6LS32 = "(?:(?:$IPV4RE|$IPV6H16:$IPV6H16))";

my $IPV6RE = "(?:" .
    "(?:(?:" .                             "(?:$IPV6H16:){6})$IPV6LS32)|" .
    "(?:(?:" .                           "::(?:$IPV6H16:){5})$IPV6LS32)|" .
    "(?:(?:(?:" .              "$IPV6H16)?::(?:$IPV6H16:){4})$IPV6LS32)|" .
    "(?:(?:(?:(?:$IPV6H16:){0,1}$IPV6H16)?::(?:$IPV6H16:){3})$IPV6LS32)|" .
    "(?:(?:(?:(?:$IPV6H16:){0,2}$IPV6H16)?::(?:$IPV6H16:){2})$IPV6LS32)|" .
    "(?:(?:(?:(?:$IPV6H16:){0,3}$IPV6H16)?::(?:$IPV6H16:){1})$IPV6LS32)|" .
    "(?:(?:(?:(?:$IPV6H16:){0,4}$IPV6H16)?::" .           ")$IPV6LS32)|" .
    "(?:(?:(?:(?:$IPV6H16:){0,5}$IPV6H16)?::" .            ")$IPV6H16)|" .
    "(?:(?:(?:(?:$IPV6H16:){0,6}$IPV6H16)?::" .                    ")))";

my $HOSTv4RE0 = "(?:[\\w\\.\\-_]+|$IPV4RE)";       # hostname or ipv4 address
my $HOSTv4RE1 = "(?:$HOSTv4RE0|\\[$HOSTv4RE0\\])"; # these may be in brackets, too
my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])";       # ipv6 must always be in brackets
# targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional
my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!;

my $DISK_KEY_RE = qr/^(?:(?:(?:virtio|ide|scsi|sata|efidisk|mp)\d+)|rootfs): /;

my $INSTANCE_ID = get_instance_id($$);

my $command = $ARGV[0];

if (defined($command) && $command ne 'help' && $command ne 'printpod') {
    check_bin ('cstream');
    check_bin ('zfs');
    check_bin ('ssh');
    check_bin ('scp');
}

$SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} =  sub {
    die "Signaled, aborting sync: $!\n";
};

sub check_bin {
    my ($bin)  = @_;

    foreach my $p (split (/:/, $ENV{PATH})) {
	my $fn = "$p/$bin";
	if (-x $fn) {
	    return $fn;
	}
    }

    die "unable to find command '$bin'\n";
}

sub read_file {
    my ($filename, $one_line_only) = @_;

    my $fh = IO::File->new($filename, "r")
	or die "Could not open file ${filename}: $!\n";

    my $text = $one_line_only ? <$fh> : [ <$fh> ];

    close($fh);

    return $text;
}

sub cut_target_width {
    my ($path, $maxlen) = @_;
    $path =~ s@/+@/@g;

    return $path if length($path) <= $maxlen;

    return '..'.substr($path, -$maxlen+2) if $path !~ m@/@;

    $path =~ s@/([^/]+/?)$@@;
    my $tail = $1;

    if (length($tail)+3 == $maxlen) {
	return "../$tail";
    } elsif (length($tail)+2 >= $maxlen) {
	return '..'.substr($tail, -$maxlen+2)
    }

    $path =~ s@(/[^/]+)(?:/|$)@@;
    my $head = $1;
    my $both = length($head) + length($tail);
    my $remaining = $maxlen-$both-4; # -4 for "/../"

    if ($remaining < 0) {
	return substr($head, 0, $maxlen - length($tail) - 3) . "../$tail"; # -3 for "../"
    }

    substr($path, ($remaining/2), (length($path)-$remaining), '..');
    return "$head/" . $path . "/$tail";
}

sub locked {
    my ($lock_fn, $code) = @_;

    my $lock_fh = IO::File->new("> $lock_fn");

    flock($lock_fh, LOCK_EX) || die "Couldn't acquire lock - $!\n";
    my $res = eval { $code->() };
    my $err = $@;

    flock($lock_fh, LOCK_UN) || warn "Error unlocking - $!\n";
    die "$err" if $err;

    close($lock_fh);
    return $res;
}

sub get_status {
    my ($source, $name, $status) = @_;

    if ($status->{$source->{all}}->{$name}->{status}) {
	return $status;
    }

    return undef;
}

sub check_dataset_exists {
    my ($dataset, $ip, $user) = @_;

    my $cmd = [];

    if ($ip) {
	push @$cmd, 'ssh', "$user\@$ip", '--';
    }
    push @$cmd, 'zfs', 'list', '-H', '--', $dataset;
    eval {
	run_cmd($cmd);
    };

    if ($@) {
	return 0;
    }
    return 1;
}

sub create_file_system {
    my ($file_system, $ip, $user) = @_;

    my $cmd = [];

    if ($ip) {
	push @$cmd, 'ssh', "$user\@$ip", '--';
    }
    push @$cmd, 'zfs', 'create', $file_system;

    run_cmd($cmd);
}

sub parse_target {
    my ($text) = @_;

    my $errstr = "$text : is not a valid input! Use [IP:]<VMID> or [IP:]<ZFSPool>[/Path]";
    my $target = {};

    if ($text !~ $TARGETRE) {
	die "$errstr\n";
    }
    $target->{all} = $2;
    $target->{ip} = $1 if $1;
    my @parts = split('/', $2);

    $target->{ip} =~ s/^\[(.*)\]$/$1/ if $target->{ip};

    my $pool = $target->{pool} = shift(@parts);
    die "$errstr\n" if !$pool;

    if ($pool =~ m/^\d+$/) {
	$target->{vmid} = $pool;
	delete $target->{pool};
    }

    return $target if (@parts == 0);
    $target->{last_part} = pop(@parts);

    if ($target->{ip}) {
	pop(@parts);
    }
    if (@parts > 0) {
	$target->{path} = join('/', @parts);
    }

    return $target;
}

sub read_cron {

    #This is for the first use to init file;
    if (!-e $CRONJOBS) {
	my $new_fh = IO::File->new("> $CRONJOBS");
	die "Could not create $CRONJOBS: $!\n" if !$new_fh;
	close($new_fh);
	return undef;
    }

    my $text = read_file($CRONJOBS, 0);

    return encode_cron(@{$text});
}

sub parse_argv {
    my (@arg) = @_;

    my $param = {
	dest => undef,
	source => undef,
	verbose => undef,
	limit => undef,
	maxsnap => undef,
	name => undef,
	skip => undef,
	method => undef,
	source_user => undef,
	dest_user => undef,
	prepend_storage_id => undef,
	properties => undef,
	dest_config_path => undef,
    };

    my ($ret) = GetOptionsFromArray(
	\@arg,
	'dest=s' => \$param->{dest},
	'source=s' => \$param->{source},
	'verbose' => \$param->{verbose},
	'limit=i' => \$param->{limit},
	'maxsnap=i' => \$param->{maxsnap},
	'name=s' => \$param->{name},
	'skip' => \$param->{skip},
	'method=s' => \$param->{method},
	'source-user=s' => \$param->{source_user},
	'dest-user=s' => \$param->{dest_user},
	'prepend-storage-id' => \$param->{prepend_storage_id},
	'properties' => \$param->{properties},
	'dest-config-path=s' => \$param->{dest_config_path},
    );

    die "can't parse options\n" if $ret == 0;

    $param->{name} //= "default";
    $param->{maxsnap} //= 1;
    $param->{method} //= "ssh";
    $param->{source_user} //= "root";
    $param->{dest_user} //= "root";

    return $param;
}

sub add_state_to_job {
    my ($job) = @_;

    my $states = read_state();
    my $state = $states->{$job->{source}}->{$job->{name}};

    $job->{state} = $state->{state};
    $job->{lsync} = $state->{lsync};
    $job->{vm_type} = $state->{vm_type};
    $job->{instance_id} = $state->{instance_id};

    for (my $i = 0; $state->{"snap$i"}; $i++) {
	$job->{"snap$i"} = $state->{"snap$i"};
    }

    return $job;
}

sub encode_cron {
    my (@text) = @_;

    my $cfg = {};

    while (my $line = shift(@text)) {

	my @arg = split('\s', $line);
	my $param = parse_argv(@arg);

	if ($param->{source} && $param->{dest}) {
	    my $source = delete $param->{source};
	    my $name = delete $param->{name};

	    $cfg->{$source}->{$name} = $param;
	}
    }

    return $cfg;
}

sub param_to_job {
    my ($param) = @_;

    my $job = {};

    my $source = parse_target($param->{source});
    my $dest;
    $dest = parse_target($param->{dest}) if $param->{dest};

    $job->{name} = !$param->{name} ? "default" : $param->{name};
    $job->{dest} = $param->{dest} if $param->{dest};
    $job->{method} = "local" if !$dest->{ip} && !$source->{ip};
    $job->{method} = "ssh" if !$job->{method};
    $job->{limit} = $param->{limit};
    $job->{maxsnap} = $param->{maxsnap} if $param->{maxsnap};
    $job->{source} = $param->{source};
    $job->{source_user} = $param->{source_user};
    $job->{dest_user} = $param->{dest_user};
    $job->{prepend_storage_id} = !!$param->{prepend_storage_id};
    $job->{properties} = !!$param->{properties};
    $job->{dest_config_path} = $param->{dest_config_path} if $param->{dest_config_path};

    return $job;
}

sub read_state {

    if (!-e $STATE) {
	make_path $CONFIG_PATH;
	my $new_fh = IO::File->new("> $STATE");
	die "Could not create $STATE: $!\n" if !$new_fh;
	print $new_fh "{}";
	close($new_fh);
	return undef;
    }

    my $text = read_file($STATE, 1);
    return decode_json($text);
}

sub update_state {
    my ($job) = @_;

    my $text = eval { read_file($STATE, 1); };

    my $out_fh = IO::File->new("> $STATE.new");
    die "Could not open file ${STATE}.new: $!\n" if !$out_fh;

    my $states = {};
    my $state = {};
    if ($text){
	$states = decode_json($text);
	$state = $states->{$job->{source}}->{$job->{name}};
    }

    if ($job->{state} ne "del") {
	$state->{state} = $job->{state};
	$state->{lsync} = $job->{lsync};
	$state->{instance_id} = $job->{instance_id};
	$state->{vm_type} = $job->{vm_type};

	for (my $i = 0; $job->{"snap$i"} ; $i++) {
	    $state->{"snap$i"} = $job->{"snap$i"};
	}
	$states->{$job->{source}}->{$job->{name}} = $state;
    } else {

	delete $states->{$job->{source}}->{$job->{name}};
	delete $states->{$job->{source}} if !keys %{$states->{$job->{source}}};
    }

    $text = encode_json($states);
    print $out_fh $text;

    close($out_fh);
    rename "$STATE.new", $STATE;

    return $states;
}

sub update_cron {
    my ($job) = @_;

    my $updated;
    my $has_header;
    my $line_no = 0;
    my $text = "";
    my $header = "SHELL=/bin/sh\n";
    $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n";

    my $current = read_file($CRONJOBS, 0);

    foreach my $line (@{$current}) {
	chomp($line);
	if ($line =~ m/source $job->{source} .*name $job->{name} /) {
	    $updated = 1;
	    next if $job->{state} eq "del";
	    $text .= format_job($job, $line);
	} else {
	    if (($line_no < 3) && ($line =~ /^(PATH|SHELL)/ )) {
		$has_header = 1;
	    }
	    $text .= "$line\n";
	}
	$line_no++;
    }

    if (!$has_header) {
	$text = "$header$text";
    }

    if (!$updated) {
	$text .= format_job($job);
    }
    my $new_fh = IO::File->new("> ${CRONJOBS}.new");
    die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh;

    print $new_fh $text or die "can't write to $CRONJOBS.new: $!\n";
    close ($new_fh);

    rename "${CRONJOBS}.new", $CRONJOBS or die "can't move $CRONJOBS.new: $!\n";
}

sub format_job {
    my ($job, $line) = @_;
    my $text = "";

    if ($job->{state} eq "stopped") {
	$text = "#";
    }
    if ($line) {
	$line =~ /^#*\s*((?:\S+\s+){4}\S+)\s+root/;
	$text .= $1;
    } else {
	$text .= "*/$INTERVAL * * * *";
    }
    $text .= " root";
    $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}";
    $text .= " --name $job->{name} --maxsnap $job->{maxsnap}";
    $text .= " --limit $job->{limit}" if $job->{limit};
    $text .= " --method $job->{method}";
    $text .= " --verbose" if $job->{verbose};
    $text .= " --source-user $job->{source_user}";
    $text .= " --dest-user $job->{dest_user}";
    $text .= " --prepend-storage-id" if $job->{prepend_storage_id};
    $text .= " --properties" if $job->{properties};
    $text .= " --dest-config-path $job->{dest_config_path}" if $job->{dest_config_path};
    $text .= "\n";

    return $text;
}

sub list {

    my $cfg = read_cron();

    my $list = sprintf("%-25s%-25s%-10s%-20s%-6s%-5s\n" , "SOURCE", "NAME", "STATE", "LAST SYNC", "TYPE", "CON");

    my $states = read_state();
    foreach my $source (sort keys%{$cfg}) {
	foreach my $name (sort keys%{$cfg->{$source}}) {
	    $list .= sprintf("%-25s", cut_target_width($source, 25));
	    $list .= sprintf("%-25s", cut_target_width($name, 25));
	    $list .= sprintf("%-10s", $states->{$source}->{$name}->{state});
	    $list .= sprintf("%-20s", $states->{$source}->{$name}->{lsync});
	    $list .= sprintf("%-6s", defined($states->{$source}->{$name}->{vm_type}) ? $states->{$source}->{$name}->{vm_type} : "undef");
	    $list .= sprintf("%-5s\n", $cfg->{$source}->{$name}->{method});
	}
    }

    return $list;
}

sub vm_exists {
    my ($target, $user) = @_;

    return undef if !defined($target->{vmid});

    my $conf_fn = "$target->{vmid}.conf";

    if ($target->{ip}) {
	my @cmd = ('ssh', "$user\@$target->{ip}", '--', '/bin/ls');
	return "qemu" if eval { run_cmd([@cmd, "$QEMU_CONF/$conf_fn"]) };
	return "lxc" if  eval { run_cmd([@cmd, "$LXC_CONF/$conf_fn"]) };
    } else {
	return "qemu" if -f "$QEMU_CONF/$conf_fn";
	return "lxc" if -f "$LXC_CONF/$conf_fn";
    }

    return undef;
}

sub init {
    my ($param) = @_;

    locked("$CONFIG_PATH/cron_and_state.lock", sub {
	my $cfg = read_cron();

	my $job = param_to_job($param);

	$job->{state} = "ok";
	$job->{lsync} = 0;

	my $source = parse_target($param->{source});
	my $dest = parse_target($param->{dest});

	if (my $ip =  $dest->{ip}) {
	    run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{dest_user}\@$ip"]);
	}

	if (my $ip =  $source->{ip}) {
	    run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{source_user}\@$ip"]);
	}

	die "Pool $dest->{all} does not exist\n"
	    if !check_dataset_exists($dest->{all}, $dest->{ip}, $param->{dest_user});

	if (!defined($source->{vmid})) {
	    die "Pool $source->{all} does not exist\n"
		if !check_dataset_exists($source->{all}, $source->{ip}, $param->{source_user});
	}

	my $vm_type = vm_exists($source, $param->{source_user});
	$job->{vm_type} = $vm_type;
	$source->{vm_type} = $vm_type;

	die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;

	die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};

	#check if vm has zfs disks if not die;
	get_disks($source, $param->{source_user}) if $source->{vmid};

	update_cron($job);
	update_state($job);
    }); #cron and state lock

    return if $param->{skip};

    eval { sync($param) };
    if (my $err = $@) {
	destroy_job($param);
	print $err;
    }
}

sub get_job {
    my ($param) = @_;

    my $cfg = read_cron();

    if (!$cfg->{$param->{source}}->{$param->{name}}) {
	die "Job  with source $param->{source} and name $param->{name} does not exist\n" ;
    }
    my $job = $cfg->{$param->{source}}->{$param->{name}};
    $job->{name} = $param->{name};
    $job->{source} = $param->{source};
    $job = add_state_to_job($job);

    return $job;
}

sub destroy_job {
    my ($param) = @_;

    locked("$CONFIG_PATH/cron_and_state.lock", sub {
	my $job = get_job($param);
	$job->{state} = "del";

	update_cron($job);
	update_state($job);
    });
}

sub get_instance_id {
    my ($pid) = @_;

    my $stat = read_file("/proc/$pid/stat", 1)
	or die "unable to read process stats\n";
    my $boot_id = read_file("/proc/sys/kernel/random/boot_id", 1)
	or die "unable to read boot ID\n";

    my $stats = [ split(/\s+/, $stat) ];
    my $starttime = $stats->[21];
    chomp($boot_id);

    return "${pid}:${starttime}:${boot_id}";
}

sub instance_exists {
    my ($instance_id) = @_;

    if (defined($instance_id) && $instance_id =~ m/^([1-9][0-9]*):/) {
	my $pid = $1;
	my $actual_id = eval { get_instance_id($pid); };
	return defined($actual_id) && $actual_id eq $instance_id;
    }

    return 0;
}

sub sync {
    my ($param) = @_;

    my $job;

    locked("$CONFIG_PATH/cron_and_state.lock", sub {
	eval { $job = get_job($param) };

	if ($job) {
	    my $state = $job->{state} // 'ok';
	    $state = 'ok' if !instance_exists($job->{instance_id});

	    if ($state eq "syncing" || $state eq "waiting") {
		die "Job --source $param->{source} --name $param->{name} is already scheduled to sync\n";
	    }

	    $job->{state} = "waiting";
	    $job->{instance_id} = $INSTANCE_ID;

	    update_state($job);
	}
    });

    locked("$CONFIG_PATH/sync.lock", sub {

	my $date = get_date();

	my $dest;
	my $source;
	my $vm_type;

	locked("$CONFIG_PATH/cron_and_state.lock", sub {
	    #job might've changed while we waited for the sync lock, but we can be sure it's not syncing
	    eval { $job = get_job($param); };

	    if ($job && defined($job->{state}) && $job->{state} eq "stopped") {
		die "Job --source $param->{source} --name $param->{name} has been disabled\n";
	    }

	    $dest = parse_target($param->{dest});
	    $source = parse_target($param->{source});

	    $vm_type = vm_exists($source, $param->{source_user});
	    $source->{vm_type} = $vm_type;

	    if ($job) {
		$job->{state} = "syncing";
		$job->{vm_type} = $vm_type if !$job->{vm_type};
		update_state($job);
	    }
	}); #cron and state lock

	my $sync_path = sub {
	    my ($source, $dest, $job, $param, $date) = @_;

	    ($dest->{old_snap}, $dest->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name}, $param->{dest_user});

	    prepare_prepended_target($source, $dest, $param->{dest_user}) if defined($dest->{prepend});

	    snapshot_add($source, $dest, $param->{name}, $date, $param->{source_user}, $param->{dest_user});

	    send_image($source, $dest, $param);

	    snapshot_destroy($source, $dest, $param->{method}, $dest->{old_snap}, $param->{source_user}, $param->{dest_user}) if ($source->{destroy} && $dest->{old_snap});

	};

	eval{
	    if ($source->{vmid}) {
		die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
		die "source-user has to be root for syncing VMs\n" if ($param->{source_user} ne "root");
		my $disks = get_disks($source, $param->{source_user});

		foreach my $disk (sort keys %{$disks}) {
		    $source->{all} = $disks->{$disk}->{all};
		    $source->{pool} = $disks->{$disk}->{pool};
		    $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
		    $source->{last_part} = $disks->{$disk}->{last_part};

		    $dest->{prepend} = $disks->{$disk}->{storage_id}
			if $param->{prepend_storage_id};

		    &$sync_path($source, $dest, $job, $param, $date);
		}
		if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
		    send_config($source, $dest,'ssh', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
		} else {
		    send_config($source, $dest,'local', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
		}
	    } else {
		&$sync_path($source, $dest, $job, $param, $date);
	    }
	};
	if (my $err = $@) {
	    locked("$CONFIG_PATH/cron_and_state.lock", sub {
		eval { $job = get_job($param); };
		if ($job) {
		    $job->{state} = "error";
		    delete $job->{instance_id};
		    update_state($job);
		}
	    });
	    print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
	    die "$err\n";
	}

	locked("$CONFIG_PATH/cron_and_state.lock", sub {
	    eval { $job = get_job($param); };
	    if ($job) {
		if (defined($job->{state}) && $job->{state} eq "stopped") {
		    $job->{state} = "stopped";
		} else {
		    $job->{state} = "ok";
		}
		$job->{lsync} = $date;
		delete $job->{instance_id};
		update_state($job);
	    }
	});
    }); #sync lock
}

sub snapshot_get{
    my ($source, $dest, $max_snap, $name, $dest_user) = @_;

    my $cmd = [];
    push @$cmd, 'ssh', "$dest_user\@$dest->{ip}", '--', if $dest->{ip};
    push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation';

    my $path = target_dataset($source, $dest);
    push @$cmd, $path;

    my $raw;
    eval {$raw = run_cmd($cmd)};
    if (my $erro =$@) { #this means the volume doesn't exist on dest yet
       return undef;
    }

    my $index = 0;
    my $line = "";
    my $last_snap = undef;
    my $old_snap;

    while ($raw && $raw =~ s/^(.*?)(\n|$)//) {
	$line = $1;
	if ($line =~ m/@(.*)$/) {
	    $last_snap = $1 if (!$last_snap);
	}
	if ($line =~ m/(rep_\Q${name}\E_\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2})$/) {
	    $old_snap = $1;
	    $index++;
	    if ($index == $max_snap) {
		$source->{destroy} = 1;
		last;
	    };
	}
    }

    return ($old_snap, $last_snap) if $last_snap;

    return undef;
}

sub snapshot_add {
    my ($source, $dest, $name, $date, $source_user, $dest_user) = @_;

    my $snap_name = "rep_$name\_".$date;

    $source->{new_snap} = $snap_name;

    my $path = "$source->{all}\@$snap_name";

    my $cmd = [];
    push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
    push @$cmd, 'zfs', 'snapshot', $path;
    eval{
	run_cmd($cmd);
    };

    if (my $err = $@) {
	snapshot_destroy($source, $dest, 'ssh', $snap_name, $source_user, $dest_user);
	die "$err\n";
    }
}

sub get_disks {
    my ($target, $user) = @_;

    my $cmd = [];
    push @$cmd, 'ssh', "$user\@$target->{ip}", '--', if $target->{ip};

    if ($target->{vm_type} eq 'qemu') {
	push @$cmd, 'qm', 'config', $target->{vmid};
    } elsif ($target->{vm_type} eq 'lxc') {
	push @$cmd, 'pct', 'config', $target->{vmid};
    } else {
	die "VM Type unknown\n";
    }

    my $res = run_cmd($cmd);

    my $disks = parse_disks($res, $target->{ip}, $target->{vm_type}, $user);

    return $disks;
}

sub run_cmd {
    my ($cmd) = @_;
    print "Start CMD\n" if $DEBUG;
    print Dumper $cmd if $DEBUG;
    if (ref($cmd) eq 'ARRAY') {
	$cmd = join(' ', map { ref($_) ? $$_ : shell_quote($_) } @$cmd);
    }
    my $output = `$cmd 2>&1`;

    die "COMMAND:\n\t$cmd\nGET ERROR:\n\t$output" if 0 != $?;

    chomp($output);
    print Dumper $output if $DEBUG;
    print "END CMD\n" if $DEBUG;
    return $output;
}

sub parse_disks {
    my ($text, $ip, $vm_type, $user) = @_;

    my $disks;

    my $num = 0;
    while ($text && $text =~ s/^(.*?)(\n|$)//) {
	my $line = $1;

	next if $line =~ /media=cdrom/;
	next if $line !~ m/$DISK_KEY_RE/;

	#QEMU if backup is not set include in  sync
	next if $vm_type eq 'qemu' && ($line =~ m/backup=(?i:0|no|off|false)/);

	#LXC if backup is not set do no in sync
	next if $vm_type eq 'lxc' && ($line =~ m/^mp\d:/) && ($line !~ m/backup=(?i:1|yes|on|true)/);

	my $disk = undef;
	my $stor = undef;
	if($line =~ m/$DISK_KEY_RE(.*)$/) {
	    my @parameter = split(/,/,$1);

	    foreach my $opt (@parameter) {
		if ($opt =~ m/^(?:file=|volume=)?([^:]+):([A-Za-z0-9\-]+)$/){
		    $disk = $2;
		    $stor = $1;
		    last;
		}
	    }
	}
	if (!defined($disk) || !defined($stor)) {
	    print "Disk: \"$line\" has no valid zfs dataset format and will be skipped\n";
	    next;
	}

	my $cmd = [];
	push @$cmd, 'ssh', "$user\@$ip", '--' if $ip;
	push @$cmd, 'pvesm', 'path', "$stor:$disk";
	my $path = run_cmd($cmd);

	die "Get no path from pvesm path $stor:$disk\n" if !$path;

	$disks->{$num}->{storage_id} = $stor;

	if ($vm_type eq 'qemu' && $path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) {

	    my @array = split('/', $1);
	    $disks->{$num}->{pool} = shift(@array);
	    $disks->{$num}->{all} = $disks->{$num}->{pool};
	    if (0 < @array) {
		$disks->{$num}->{path} = join('/', @array);
		$disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
	    }
	    $disks->{$num}->{last_part} = $disk;
	    $disks->{$num}->{all} .= "\/$disk";

	    $num++;
	} elsif ($vm_type eq 'lxc' && $path =~ m/^\/(\w+.+)(\/(\w+.*))*(\/$disk)$/) {

	    $disks->{$num}->{pool} = $1;
	    $disks->{$num}->{all} = $disks->{$num}->{pool};

	    if ($2) {
		$disks->{$num}->{path} = $3;
		$disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
	    }

	    $disks->{$num}->{last_part} = $disk;
	    $disks->{$num}->{all} .= "\/$disk";

	    $num++;

	} else {
	    die "ERROR: in path\n";
	}
    }

    die "Vm include no disk on zfs.\n" if !$disks->{0};
    return $disks;
}

# how the corresponding dataset is named on the target
sub target_dataset {
    my ($source, $dest) = @_;

    my $target = "$dest->{all}";
    $target .= "/$dest->{prepend}" if defined($dest->{prepend});
    $target .= "/$source->{last_part}" if $source->{last_part};
    $target =~ s!/+!/!g;

    return $target;
}

# create the parent dataset for the actual target
sub prepare_prepended_target {
    my ($source, $dest, $dest_user) = @_;

    die "internal error - not a prepended target\n" if !defined($dest->{prepend});

    # The parent dataset shouldn't be the actual target.
    die "internal error - no last_part for source\n" if !$source->{last_part};

    my $target = "$dest->{all}/$dest->{prepend}";
    $target =~ s!/+!/!g;

    return if check_dataset_exists($target, $dest->{ip}, $dest_user);

    create_file_system($target, $dest->{ip}, $dest_user);
}

sub snapshot_destroy {
    my ($source, $dest, $method, $snap, $source_user, $dest_user) = @_;

    my @zfscmd = ('zfs', 'destroy');
    my $snapshot = "$source->{all}\@$snap";

    eval {
	if($source->{ip} && $method eq 'ssh'){
	    run_cmd(['ssh', "$source_user\@$source->{ip}", '--', @zfscmd, $snapshot]);
	} else {
	    run_cmd([@zfscmd, $snapshot]);
	}
    };
    if (my $erro = $@) {
	warn "WARN: $erro";
    }
    if ($dest) {
	my @ssh = $dest->{ip} ? ('ssh', "$dest_user\@$dest->{ip}", '--') : ();

	my $path = target_dataset($source, $dest);

	eval {
	    run_cmd([@ssh, @zfscmd, "$path\@$snap"]);
	};
	if (my $erro = $@) {
	    warn "WARN: $erro";
	}
    }
}

# check if snapshot for incremental sync exist on source side
sub snapshot_exist {
    my ($source , $dest, $method, $source_user) = @_;

    my $cmd = [];
    push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--' if $source->{ip};
    push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name';

    my $path = $source->{all};
    $path .= "\@$dest->{last_snap}";

    push @$cmd, $path;

    eval {run_cmd($cmd)};
    if (my $erro =$@) {
	warn "WARN: $erro";
	return undef;
    }
    return 1;
}

sub send_image {
    my ($source, $dest, $param) = @_;

    my $cmd = [];

    push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{source_user}\@$source->{ip}", '--' if $source->{ip};
    push @$cmd, 'zfs', 'send';
    push @$cmd, '-p', if $param->{properties};
    push @$cmd, '-v' if $param->{verbose};

    if($dest->{last_snap} && snapshot_exist($source , $dest, $param->{method}, $param->{source_user})) {
	push @$cmd, '-i', "$source->{all}\@$dest->{last_snap}";
    }
    push @$cmd, '--', "$source->{all}\@$source->{new_snap}";

    if ($param->{limit}){
	my $bwl = $param->{limit}*1024;
	push @$cmd, \'|', 'cstream', '-t', $bwl;
    }
    my $target = target_dataset($source, $dest);

    push @$cmd, \'|';
    push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{dest_user}\@$dest->{ip}", '--' if $dest->{ip};
    push @$cmd, 'zfs', 'recv', '-F', '--';
    push @$cmd, "$target";

    eval {
	run_cmd($cmd)
    };

    if (my $erro = $@) {
	snapshot_destroy($source, undef, $param->{method}, $source->{new_snap}, $param->{source_user}, $param->{dest_user});
	die $erro;
    };
}


sub send_config{
    my ($source, $dest, $method, $source_user, $dest_user, $dest_config_path) = @_;

    my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF/$source->{vmid}.conf": "$LXC_CONF/$source->{vmid}.conf";
    my $dest_target_new ="$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}";

    my $config_dir = $dest_config_path // $CONFIG_PATH;
    $config_dir .= "/$dest->{last_part}" if $dest->{last_part};

    $dest_target_new = $config_dir.'/'.$dest_target_new;

    if ($method eq 'ssh'){
	if ($dest->{ip} && $source->{ip}) {
	    run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
	    run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
	} elsif ($dest->{ip}) {
	    run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
	    run_cmd(['scp', '--', $source_target, "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
	} elsif ($source->{ip}) {
	    run_cmd(['mkdir', '-p', '--', $config_dir]);
	    run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", $dest_target_new]);
	}

	if ($source->{destroy}){
	    my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.$dest->{old_snap}";
	    if($dest->{ip}){
		run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]);
	    } else {
		run_cmd(['rm', '-f', '--', $dest_target_old]);
	    }
	}
    } elsif ($method eq 'local') {
	run_cmd(['mkdir', '-p', '--', $config_dir]);
	run_cmd(['cp', $source_target, $dest_target_new]);
    }
}

sub get_date {
    my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) = localtime(time);
    my $datestamp = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);

    return $datestamp;
}

sub status {
    my $cfg = read_cron();

    my $status_list = sprintf("%-25s%-25s%-10s\n", "SOURCE", "NAME", "STATUS");

    my $states = read_state();

    foreach my $source (sort keys%{$cfg}) {
	foreach my $sync_name (sort keys%{$cfg->{$source}}) {
	    $status_list .= sprintf("%-25s", cut_target_width($source, 25));
	    $status_list .= sprintf("%-25s", cut_target_width($sync_name, 25));
	    $status_list .= "$states->{$source}->{$sync_name}->{state}\n";
	}
    }

    return $status_list;
}

sub enable_job {
    my ($param) = @_;

    locked("$CONFIG_PATH/cron_and_state.lock", sub {
	my $job = get_job($param);
	$job->{state} = "ok";
	update_state($job);
	update_cron($job);
    });
}

sub disable_job {
    my ($param) = @_;

    locked("$CONFIG_PATH/cron_and_state.lock", sub {
	my $job = get_job($param);
	$job->{state} = "stopped";
	update_state($job);
	update_cron($job);
    });
}

my $cmd_help = {
    destroy => qq{
$PROGNAME destroy --source <string> [OPTIONS]

    Remove a sync Job from the scheduler

	--name      string
		The name of the sync job, if not set 'default' is used.

	--source    string
		The source can be an  <VMID> or [IP:]<ZFSPool>[/Path]
    },
    create => qq{
$PROGNAME create --dest <string> --source <string> [OPTIONS]

    Create a new sync-job

	--dest      string
		The destination target is like [IP]:<Pool>[/Path]

	--dest-user string
		The name of the user on the destination target, root by default

	--limit     integer
		Maximal sync speed in kBytes/s, default is unlimited

	--maxsnap   integer
		How much snapshots will be kept before get erased, default 1

	--name      string
		The name of the sync job, if not set it is default

	--prepend-storage-id
		If specified, prepend the storage ID to the destination's path(s).

	--skip
		If specified, skip the first sync.

	--source    string
		The source can be an <VMID> or [IP:]<ZFSPool>[/Path]

	--source-user    string
		The (ssh) user-name on the source target, root by default

	--properties
		If specified, include the dataset's properties in the stream.

	--dest-config-path    string
		Specifies a custom config path on the destination target.
		The default is /var/lib/pve-zsync
    },
    sync => qq{
$PROGNAME sync --dest <string> --source <string> [OPTIONS]\n

    Trigger one sync.

	--dest      string
		The destination target is like [IP:]<Pool>[/Path]

	--dest-user string
		The (ssh) user-name on the destination target, root by default

	--limit     integer
		The maximal sync speed in kBytes/s, default is unlimited

	--maxsnap   integer
		Configure how many snapshots will be kept before get erased, default 1

	--name      string
		The name of the sync job, if not set it is 'default'.
		It is only necessary if scheduler allready contains this source.

	--prepend-storage-id
		If specified, prepend the storage ID to the destination's path(s).

	--source    string
		The source can either be an <VMID> or [IP:]<ZFSPool>[/Path]

	--source-user    string
		The name of the user on the source target, root by default

	--verbose
		If specified, print out the sync progress.

	--properties
		If specified, include the dataset's properties in the stream.

	--dest-config-path    string
		Specifies a custom config path on the destination target.
		The default is /var/lib/pve-zsync
    },
    list => qq{
$PROGNAME list

	Get a List of all scheduled Sync Jobs
    },
    status => qq{
$PROGNAME status

	Get the status of all scheduled Sync Jobs
    },
    help => qq{
$PROGNAME help <cmd> [OPTIONS]

    Get help about specified command.

	<cmd>      string
		Command name to get help about.

	--verbose
		Verbose output format.
    },
    enable => qq{
$PROGNAME enable --source <string> [OPTIONS]

    Enable a sync-job and reset all job-errors, if any.

	--name      string
		name of the sync job, if not set it is default

        --source    string
		the source can be an  <VMID> or [IP:]<ZFSPool>[/Path]
    },
    disable => qq{
$PROGNAME disable --source <string> [OPTIONS]

    Disables (pauses) a sync-job

	--name      string
		name of the sync-job, if not set it is default

	--source    string
		the source can be an  <VMID> or [IP:]<ZFSPool>[/Path]
    },
    printpod => "$PROGNAME printpod\n\n\tinternal command",

};

if (!$command) {
    usage(); die "\n";
} elsif (!$cmd_help->{$command}) {
    print "ERROR: unknown command '$command'";
    usage(1); die "\n";
}

my @arg = @ARGV;
my $param = parse_argv(@arg);

sub check_params {
    for (@_) {
	die "$cmd_help->{$command}\n" if !$param->{$_};
    }
}

if ($command eq 'destroy') {
    check_params(qw(source));

    check_target($param->{source});
    destroy_job($param);

} elsif ($command eq 'sync') {
    check_params(qw(source dest));

    check_target($param->{source});
    check_target($param->{dest});
    sync($param);

} elsif ($command eq 'create') {
    check_params(qw(source dest));

    check_target($param->{source});
    check_target($param->{dest});
    init($param);

} elsif ($command eq 'status') {
    print status();

} elsif ($command eq 'list') {
    print list();

} elsif ($command eq 'help') {
    my $help_command = $ARGV[1];

    if ($help_command && $cmd_help->{$help_command}) {
	die "$cmd_help->{$help_command}\n";

    }
    if ($param->{verbose}) {
	exec("man $PROGNAME");

    } else {
	usage(1);

    }

} elsif ($command eq 'enable') {
    check_params(qw(source));

    check_target($param->{source});
    enable_job($param);

} elsif ($command eq 'disable') {
    check_params(qw(source));

    check_target($param->{source});
    disable_job($param);

} elsif ($command eq 'printpod') {
    print_pod();
}

sub usage {
    my ($help) = @_;

    print("ERROR:\tno command specified\n") if !$help;
    print("USAGE:\t$PROGNAME <COMMAND> [ARGS] [OPTIONS]\n");
    print("\t$PROGNAME help [<cmd>] [OPTIONS]\n\n");
    print("\t$PROGNAME create --dest <string> --source <string> [OPTIONS]\n");
    print("\t$PROGNAME destroy --source <string> [OPTIONS]\n");
    print("\t$PROGNAME disable --source <string> [OPTIONS]\n");
    print("\t$PROGNAME enable --source <string> [OPTIONS]\n");
    print("\t$PROGNAME list\n");
    print("\t$PROGNAME status\n");
    print("\t$PROGNAME sync --dest <string> --source <string> [OPTIONS]\n");
}

sub check_target {
    my ($target) = @_;
    parse_target($target);
}

sub print_pod {

    my $synopsis = join("\n", sort values %$cmd_help);
    my $commands = join(", ", sort keys %$cmd_help);

    print <<EOF;
=head1 NAME

pve-zsync - PVE ZFS Storage Sync Tool

=head1 SYNOPSIS

pve-zsync <COMMAND> [ARGS] [OPTIONS]

Where <COMMAND> can be one of: $commands

=head1 DESCRIPTION

The pve-zsync tool can help you to sync your VMs or directories stored on ZFS
between multiple servers.

pve-zsync is able to automatically configure CRON jobs, so that a periodic sync
will be automatically triggered.
The default sync interval is 15 min, if you want to change this value you can
do this in F</etc/cron.d/pve-zsync>. If you need help to configure CRON tabs, see
man crontab.

=head1 COMMANDS AND OPTIONS

$synopsis

=head1 EXAMPLES

Adds a job for syncing the local VM 100 to a remote server's ZFS pool named "tank":
    pve-zsync create --source=100 -dest=192.168.1.2:tank

=head1 IMPORTANT FILES

Cron jobs and config are stored in F</etc/cron.d/pve-zsync>

The VM configuration itself gets copied to the destination machines
F</var/lib/pve-zsync/> path.

=head1 COPYRIGHT AND DISCLAIMER

Copyright (C) 2007-2021 Proxmox Server Solutions GmbH

This program is free software: you can redistribute it and/or modify it under
the terms of the GNU Affero General Public License as published by the Free
Software Foundation, either version 3 of the License, or (at your option) any
later version.

This program is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU Affero General Public License for more
details.

You should have received a copy of the GNU Affero General Public License along
with this program. If not, see <http://www.gnu.org/licenses/>.

EOF
}
