/*--
  This file is a part of ZetaGrid, a simple and secure Grid Computing
  kernel.

  Copyright (c) 2001-2004 Sebastian Wedeniwski.  All rights reserved.

  Use in source and binary forms, with or without modification,
  are permitted provided that the following conditions are met:

  1. The source code must retain the above copyright
     notice, this list of conditions and the following disclaimer.

  2. The origin of this software must not be misrepresented; you must 
     not claim that you wrote the original software.  If you plan to
     use this software in a product, please contact the author.

  3. Altered source versions must be plainly marked as such, and must
     not be misrepresented as being the original software. The author
     must be informed about these changes.

  4. The name of the author may not be used to endorse or promote 
     products derived from this software without specific prior written 
     permission.

  THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS
  OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
  DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
  DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
  GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

  Version 1.9.0, February 8, 2004

  This program is based on the work of:
     H. Haddorp
     S. Wedeniwski
--*/

package zeta;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLEncoder;
import java.net.UnknownHostException;
import java.security.SecureRandom;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.zip.Deflater;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;

import zeta.monitor.MonitoredOutputStream;
import zeta.monitor.MonitoredURLStreamHandler;
import zeta.util.Base64;
import zeta.util.StreamUtils;


public class WorkUnitManager {

  public WorkUnitManager(Task task) {
    this.task = task;
    try {
      WorkUnit workUnitComperator = (WorkUnit)task.getWorkUnitClass().newInstance();
      possibleWorkUnits = new TreeSet(workUnitComperator);
      activeWorkUnits = new TreeSet(workUnitComperator);
      storeWorkUnits = new TreeSet(workUnitComperator);
    } catch (Exception e) {
      ZetaInfo.handle(e);
      throw new IllegalArgumentException(e.getMessage());
    }
    cleanWorkUnits(null);
    retrievePossibleWorkUnits();
    if (!ZetaClient.isUnknownHostExceptionOccur()) {
      submit(null, false, true);
    }
    completedLocalWorkUnits = numberOfCompletedLocalWorkUnits();
    exitAfterWorkUnits = properties.get("exit", 0);
    exitAfterWorkUnitsActive = (exitAfterWorkUnits > 0);
    int transferDetect = properties.get("transfer.detect", 0);
    if (transferDetect > 0 && properties.get("transfer.asynchronous", "false").equals("true")) {
      Thread t = new Thread() {
        public void run() {
          while (true) {
            int transferDetect = Math.max(properties.get("transfer.detect", 0), 300);
            if (!properties.get("transfer.asynchronous", "false").equals("true") || transferDetect == 0) {
              try {
                Thread.sleep(60000);  // wait 1 minute
              } catch (InterruptedException ex) {
              }
            } else {
              if (numberOfCompletedLocalWorkUnits() > 0) {
                boolean error = true;
                HttpURLConnection connection = null;
                try {
                  URL url = new URL("http", properties.get("host.name", "www.zetagrid.net"), properties.get("host.port", 80),
                                    properties.get("resultURL", "/servlet/service/result"));
                  connection = (HttpURLConnection)url.openConnection();
                  connection.setUseCaches(false);
                  connection.setRequestMethod("GET");
                  connection.setDoInput(true);
                  connection.connect();
                  error = false;
                } catch (Exception e) {
                  error = true;
                } finally {
                  if (connection != null) {
                    connection.disconnect();
                  }
                }
                if (!error) {
                  submit(null, false, false);
                }
              }
              try {
                Thread.sleep(transferDetect*1000);
              } catch (InterruptedException ex) {
              }
            }
          }
        }
      };
      t.start();
    }
  }

  public WorkUnit getWorkUnit() {
    WorkUnit workUnit = null;
    synchronized (activeWorkUnits) {
      cleanWorkUnits(null);
      try {
        String[] filenames = new File(".").list();
        if (filenames != null) {
          WorkUnit workUnitCandidate = (WorkUnit)task.getWorkUnitClass().newInstance();
          for (int i = 0; i < filenames.length; ++i) {
            if (workUnitCandidate.init(filenames[i]) && !workUnitCandidate.isCompleted()) {
              if ((workUnit == null || workUnitCandidate.compare(workUnitCandidate, workUnit) < 0) && (offline || possibleWorkUnits.contains(workUnitCandidate)) && !activeWorkUnits.contains(workUnitCandidate)) {
                workUnit = (WorkUnit)workUnitCandidate.clone();
              }
            }
          }
          if (workUnit != null) {
            activeWorkUnits.add(workUnit);
          }
        }
      } catch (Exception e) {
        ZetaInfo.handle(e);
        return null;
      }
    }
    return workUnit;
  }

  public void submit(final WorkUnit workUnit, final boolean exitAfterTransfer, final boolean outputExceptions) {
    synchronized (activeWorkUnits) {
      if (workUnit != null && !activeWorkUnits.contains(workUnit) || storeWorkUnits.contains(workUnit)) {
        return;
      }
      storeWorkUnits.add(workUnit);
    }
    Thread t = new Thread() {
      public void run() {
        try {
          if (workUnit == null) {
            int storedWorkUnits = storeCompletedWorkUnits(outputExceptions);
            if (storedWorkUnits > 0) {
              if (exitAfterTransfer) {
                System.exit(1);
              }
              if (exitAfterWorkUnitsActive) {
                exitAfterWorkUnits -= storedWorkUnits;
                if (exitAfterWorkUnits <= numberOfCompletedLocalWorkUnits()-completedLocalWorkUnits-getNumberOfActiveWorkUnits()) {
                  System.exit(1);
                }
              }
            }
            if ((outputExceptions || storedWorkUnits > 0) && !requestNewWorkUnits()) {
              ZetaInfo.write(" ");
              try {
                Thread.sleep(wait);
              } catch (InterruptedException ex) {
              }
              wait = (wait == 60000)? 900000 : 60000;
            } else {
              ZetaInfo.write(" ");
            }
          } else {
            wait = 60000;
            if (storeData(workUnit, outputExceptions)) {
              if (exitAfterTransfer || exitAfterWorkUnitsActive && --exitAfterWorkUnits <= numberOfCompletedLocalWorkUnits()-completedLocalWorkUnits-getNumberOfActiveWorkUnits()) {
                System.exit(1);
              }
              requestNewWorkUnits();
              ZetaInfo.write(" ");
            }
          }
        } finally {
          synchronized (activeWorkUnits) {
            if (workUnit != null) {
              activeWorkUnits.remove(workUnit);
            }
            storeWorkUnits.remove(workUnit);
          }
        }
      }
    };
    if (properties.get("transfer.asynchronous", "false").equals("true")) {
      t.start();
    } else {
      t.run();
    }
  }

  private boolean storeData(WorkUnit workUnit, boolean outputExceptions) {
    synchronized (WorkUnitManager.class) {
      if (!workUnit.isCompleted()) {
        return false;
      }
      boolean error = true;
      for (int tries = 0; error && tries < 2; ++tries) {
        error = false;
        SecureRandom rnd = new SecureRandom();
        rnd.setSeed(System.currentTimeMillis());
        String dataFilename = workUnit.getFilename();
        String logFilename = workUnit.getLogFilename();
        File dataFile = new File(dataFilename + ".$$$");
        File logFile = new File(logFilename + ".$$$");
        HttpURLConnection connection = null;
        OutputStream out = null;
        try {
          String hostname = properties.get("host.name", "www.zetagrid.net");
          InetAddress localHost = InetAddress.getLocalHost();
          String paramString = "task=" + URLEncoder.encode(task.getTaskname())
                             + "&work_unit_id=" + workUnit.getWorkUnitId()
                             + "&user=" + URLEncoder.encode(properties.get("name"))
                             + "&hostname=" + URLEncoder.encode(localHost.getHostName().toLowerCase())
                             + "&hostaddr=" + URLEncoder.encode(localHost.getHostAddress())
                             + "&key=" + URLEncoder.encode(getKey());
          try {
            paramString = ZetaClient.encryptURLFile(paramString);
          } catch (Throwable t) {   // ToDo: remove code
          }
          ZetaInfo.write("Checking connection to server " + hostname);
          URL url = new URL("http", hostname, properties.get("host.port", 80), properties.get("resultURL", "/servlet/service/result"));
          connection = (HttpURLConnection)url.openConnection();
          connection.setUseCaches(false);
          connection.setRequestProperty("Content-Length", "2");
          connection.setRequestProperty("Content-Type", "application/octet-stream");
          connection.setRequestProperty("Param-String",  paramString); // not a standard property !!
          connection.setDoOutput(true);
          connection.setRequestMethod("POST");
          connection.connect();
          out = connection.getOutputStream();
          ZetaInfo.write("Checking availability of the services at " + hostname);
          out.write(new byte[] { 1, 2 });
          out.flush();
          int code = connection.getResponseCode();
          if (code == HttpURLConnection.HTTP_UNAVAILABLE) {
            error = true;
            continue;
          }
          if (code != HttpURLConnection.HTTP_ACCEPTED && code != HttpURLConnection.HTTP_NOT_ACCEPTABLE) {
            ZetaInfo.write("The services at " + hostname + " are not available!");
            return false;
          }
          byte[] keyEncryptorClass = null;
          String keySignature = null;
          ByteArrayOutputStream resultOut = new ByteArrayOutputStream(4 * 1024 * 1024);
          if (code == HttpURLConnection.HTTP_ACCEPTED) {
            try {
              // retrieve the encryptor with its digitla signature
              StreamUtils.writeData(connection.getInputStream(), resultOut, false, true);
              ZipInputStream zip = new ZipInputStream(new ByteArrayInputStream(Base64.decode(resultOut.toString("UTF-8"))));
              while (true) {
                ZipEntry entry = zip.getNextEntry();
                if (entry == null) {
                  break;
                }
                String name = entry.getName();
                if (name.equals("className")) {
                  resultOut.reset();
                  StreamUtils.writeData(zip, resultOut, false, true);
                  keyEncryptorClass = resultOut.toByteArray();
                } else if (name.equals("signature")) {
                  resultOut.reset();
                  StreamUtils.writeData(zip, resultOut, false, true);
                  keySignature = resultOut.toString("UTF-8");
                } else {
                  throw new IOException("The signature contains a not valid value '" + name + '\'');
                }
              }
            } catch (IOException ioe) {
              ZetaInfo.handle(ioe);
              keyEncryptorClass = null;
              keySignature = null;
            }
          }
          connection.disconnect();
          connection = null;
          if (keyEncryptorClass == null || keyEncryptorClass.length == 0) {
            ZetaInfo.write("No encryption key is available at " + hostname);
            return false;
          }
          if (keySignature == null || keySignature.length() == 0) {
            ZetaInfo.write("No digital signature is available for the encryption key at " + hostname);
            return false;
          }
          // check signature of the encryption class
          ZetaInfo.write("Check digital signature of the encryption class");
          FileInputStream fin = null;
          try {
            if (!ZetaClient.verify(null, keySignature, keyEncryptorClass)) {
              throw new IOException("Wrong signature for the encryption class");
            }
            if (!StreamUtils.checkAvailDiskSpace(new FileInputStream(dataFilename), dataFile)) {
              throw new IOException("Hard disk is full!");
            }
            resultOut.reset();
            ZetaInfo.write("Encrypting work unit " + workUnit.getWorkUnitId());
            ZetaClient.encrypt(rnd.nextInt(1024)+5, keyEncryptorClass, dataFilename, dataFilename + ".$$$");
            ZetaClient.encrypt(rnd.nextInt(1024)+5, keyEncryptorClass, logFilename, logFilename + ".$$$");
          } catch (Error err) {
            // ToDo: remove code
            ZetaInfo.write("The digital signature cannot be verified. Please update your client!");
            if (!checkAvailDiskSpace(dataFilename, dataFile)) {
              throw new IOException("Hard disk is full!");
            }
            resultOut.reset();
            ZetaInfo.write("Encrypting work unit " + workUnit.getWorkUnitId());
            ZetaClient.encrypt(rnd.nextInt(1024)+5, dataFilename, dataFilename + ".$$$");
            ZetaClient.encrypt(rnd.nextInt(1024)+5, logFilename, logFilename + ".$$$");
          } finally {
            StreamUtils.close(fin);
          }
          FileOutputStream fileOut = null;
          try {
            ZipOutputStream zip = new ZipOutputStream(resultOut);
            zip.setLevel(Deflater.BEST_COMPRESSION);
            zip.putNextEntry(new ZipEntry(dataFile.getName()));
            StreamUtils.writeData(new FileInputStream(dataFile), zip, true, false);
            zip.putNextEntry(new ZipEntry(logFile.getName()));
            StreamUtils.writeData(new FileInputStream(logFile), zip, true, false);
            zip.close();
          } finally {
            resultOut.close();
          }
          dataFile.delete();
          logFile.delete();
    
          ZetaInfo.write("Connecting server " + hostname);
          MonitoredOutputStream.setWorkUnitId(workUnit.getWorkUnitId());
          url = new URL("http", hostname, properties.get("host.port", 80), properties.get("resultURL", "/servlet/service/result"), new MonitoredURLStreamHandler());
          connection = (HttpURLConnection)url.openConnection();
          connection.setUseCaches(false);
          byte[] resultBuffer = resultOut.toByteArray();
          connection.setRequestProperty("Content-Length", Long.toString(resultBuffer.length));
          connection.setRequestProperty("Content-Type", "application/octet-stream");
          connection.setRequestProperty("Param-String",  paramString); // not a standard property !!
          connection.setDoOutput(true);
          connection.setRequestMethod("POST");
          connection.connect();
          out = connection.getOutputStream();
          ZetaInfo.write("Server " + hostname + " connected");
          out.write(resultBuffer);
          out.flush();
          ZetaInfo.write("Transfering work unit " + workUnit.getWorkUnitId());
          resultBuffer = null;
          code = connection.getResponseCode();
          if (code != HttpURLConnection.HTTP_OK) {
            ZetaInfo.write("Could not store result: return code " + code);
            System.err.println("Could not store result: return code " + code);
            error = true;
            if (code == HttpURLConnection.HTTP_INTERNAL_ERROR || code == HttpURLConnection.HTTP_UNAVAILABLE) {
              continue;
            }
            return false;
          }
          ZetaInfo.write("Work unit " + workUnit.getWorkUnitId() + " successfully transferred");
          dataFile = new File(dataFilename);
          logFile  = new File(logFilename);
        } catch (UnknownHostException uhe) {
          error = true;
          tries = 1;
          if (outputExceptions) {
            ZetaInfo.handle(uhe);
          }
        } catch (IOException e) {
          error = true;
          try {
            if (connection == null || connection.getResponseCode() != HttpURLConnection.HTTP_INTERNAL_ERROR) {
              tries = 1;
            }
          } catch (IOException ioe) {
          }
          if (tries == 1 && outputExceptions) {
            ZetaInfo.handle(e);
          }
          try {
            Thread.sleep(4000);
          } catch (InterruptedException ex) {
          }
        } finally {
          StreamUtils.close(out);
          if (connection != null) {
            connection.disconnect();
            connection = null;
          }
          dataFile.delete();
          logFile.delete();
          cleanWorkUnits(null);
          System.gc();
        }
      }
      return !error;
    }
  }

  /**
   *  @return number of completed work units which are stored successfully at the server, -1 if an error occurs and no work unit is stored
  **/
  private int storeCompletedWorkUnits(boolean outputExceptions) {
    synchronized (WorkUnitManager.class) {
      int storedWorkUnits = 0;
      try {
        String[] filenames = new File(".").list();
        if (filenames != null) {
          WorkUnit workUnit = (WorkUnit)task.getWorkUnitClass().newInstance();
          for (int i = 0; i < filenames.length; ++i) {
            if (workUnit.init(filenames[i]) && workUnit.isCompleted()) {
              File file = new File(workUnit.getFilename());
              if (file.exists() && workUnit.isValid() && storeData(workUnit, outputExceptions) && ++storedWorkUnits == 0) {
                storedWorkUnits = 1;
              }
            }
          }
        }
      } catch (Exception e) {
        if (outputExceptions) {
          ZetaInfo.handle(e);
        }
        if (storedWorkUnits == 0) {
          storedWorkUnits = -1;
        }
      }
      return storedWorkUnits;
    }
  }

  private boolean requestNewWorkUnits() {
    synchronized (WorkUnitManager.class) {
      if (properties.get("processors", 1) <= 0) {
        return false;
      }
      int workUnits = properties.get("work_units", 1);
      // Attention: Work units can be redistributed.
      if (workUnits <= numberOfLocalWorkUnits(2*24*60*60*1000)) {  // do not request new work units if there are enough local work units
        retrievePossibleWorkUnits();
        return true;
      }
      String workUnitSize = properties.get("work_unit_size", "m");
      String eMail = properties.get("eMail");
      if (eMail == null) {
        eMail = "";
      }
      String messages = properties.get("messages", null);
      if (messages == null) {
        messages = "";
      } else {
        messages = (messages.equals("true"))? "&messages=true" : "&messages=false";
      }
      String team = properties.get("team");
      if (team == null) {
        team = "";
      }
      String version = task.getVersion();
      for (int tries = 0; tries < 2; ++tries) {
        HttpURLConnection connection = null;
        try {
          InetAddress localHost = InetAddress.getLocalHost();
          ZetaInfo.write("Requesting new work units");
          String urlFile = properties.get("requestURL", "/servlet/service/requestWorkUnit")
                            + "?user=" + URLEncoder.encode(properties.get("name"))
                            + "&email=" + URLEncoder.encode(eMail)
                            + messages
                            + "&team=" + URLEncoder.encode(team)
                            + "&task=" + URLEncoder.encode(task.getTaskname())
                            + "&size=" + URLEncoder.encode(workUnitSize)
                            + "&hostname=" + URLEncoder.encode(localHost.getHostName().toLowerCase())
                            + "&hostaddr=" + URLEncoder.encode(localHost.getHostAddress())
                            + "&key=" + URLEncoder.encode(getKey())
                            + "&version=" + URLEncoder.encode(version)
                            + "&work_units=" + workUnits
                            + "&os_name=" + URLEncoder.encode(System.getProperty("os.name", "?"))
                            + "&os_version=" + URLEncoder.encode(System.getProperty("os.version", "?"))
                            + "&os_arch=" + URLEncoder.encode(System.getProperty("os.arch", "?"))
                            + "&processors=" + properties.get("processors", 1);
          try {
            urlFile = ZetaClient.encryptURLFile(urlFile);
          } catch (Throwable t) {   // ToDo: remove code
          }
          URL url = new URL("http", properties.get("host.name", "www.zetagrid.net"), properties.get("host.port", 80), urlFile);
          connection = (HttpURLConnection)url.openConnection();
          connection.setUseCaches(false);
          connection.setRequestMethod("GET");
          connection.setDoInput(true);
          connection.connect();
          storePossibleWorkUnits(new BufferedReader(new InputStreamReader(connection.getInputStream())));
          return true;
        } catch (UnknownHostException uhe) {
          tries = 1;
          ZetaInfo.handle(uhe);
        } catch (MalformedURLException e) {
          try {
            if (connection == null || connection.getResponseCode() != HttpURLConnection.HTTP_INTERNAL_ERROR) {
              tries = 1;
            }
          } catch (IOException ioe) {
          }
          if (tries == 1) {
            ZetaInfo.handle(e);
          }
          try {
            Thread.sleep(4000);
          } catch (InterruptedException ex) {
          }
        } catch (IOException e) {
          try {
            if (connection == null || connection.getResponseCode() != HttpURLConnection.HTTP_INTERNAL_ERROR) {
              tries = 1;
            }
          } catch (IOException ioe) {
          }
          if (tries == 1) {
            ZetaInfo.handle(e);
          }
          try {
            Thread.sleep(4000);
          } catch (InterruptedException ex) {
          }
        } catch (Exception e) {
          ZetaInfo.handle(e);
          return false;
        } finally {
          if (connection != null) {
            connection.disconnect();
            connection = null;
          }
        }
      }
      return false;
    }
  }

  private int getNumberOfActiveWorkUnits() {
    synchronized (activeWorkUnits) {
      return activeWorkUnits.size();
    }
  }

  /**
   *  @return 0 if all local work units are older than the specified time.
  **/
  private int numberOfLocalWorkUnits(long notOlderThanMillis) {
    int count = 0;
    synchronized (activeWorkUnits) {
      try {
        File[] files = new File(".").listFiles();
        if (files != null) {
          WorkUnit workUnit = (WorkUnit)task.getWorkUnitClass().newInstance();
          long currentTime = System.currentTimeMillis();
          for (int i = 0; i < files.length; ++i) {
            if (workUnit.init(files[i].getName()) && currentTime-files[i].lastModified() < notOlderThanMillis && (offline || possibleWorkUnits.contains(workUnit))) {
              ++count;
            }
          }
        }
      } catch (Exception e) {
        ZetaInfo.handle(e);
        count = 0;
      }
    }
    return count;
  }

  private int numberOfCompletedLocalWorkUnits() {
    int count = 0;
    synchronized (activeWorkUnits) {
      try {
        String[] filenames = new File(".").list();
        if (filenames != null) {
          WorkUnit workUnit = (WorkUnit)task.getWorkUnitClass().newInstance();
          for (int i = 0; i < filenames.length; ++i) {
            if (workUnit.init(filenames[i]) && workUnit.isCompleted() && (offline || possibleWorkUnits.contains(workUnit))) {
              ++count;
            }
          }
        }
      } catch (Exception e) {
        ZetaInfo.handle(e);
        count = 0;
      }
    }
    return count;
  }

  /**
   *  Deletes local work unit files.
   *  A log file will only be deleted if it is inside the specified list and the last modified timestamp is older than one day
   *  or the result file for the same completed work unit as the log file does not exist.
   *  Deletes all result files which work unit has no log file.
   *  @param workUnitFiles list of work unit logfile names which can be deleted
  **/
  private void cleanWorkUnits(List workUnitFiles) {
    try {
      WorkUnit workUnit = (WorkUnit)task.getWorkUnitClass().newInstance();
      File[] files = new File(".").listFiles();
      if (files != null) {
        if (workUnitFiles != null) {
          long currentTime = System.currentTimeMillis();
          for (int i = 0; i < files.length; ++i) {
            // Attention: If many clients use the same shared directory.
            if (workUnit.isLogFilename(files[i].getName()) && !workUnitFiles.contains(files[i].getName()) && currentTime-files[i].lastModified() > 24*60*60*1000) {
              files[i].delete();
            }
          }
        }
        for (int i = 0; i < files.length; ++i) {
          String filename = files[i].getName();
          if (workUnit.init(filename) && workUnit.isCompleted()) {
            File file = new File(workUnit.getFilename());
            if (!file.exists()) {  // error occur
              files[i].delete();
            }
          } else if (workUnit.isFilename(filename)) {
            File file = new File(workUnit.getLogFilename(filename));
            if (!file.exists()) {  // delete old work units
              files[i].delete();
            }
          }
        }
      }
    } catch (Exception e) {
      ZetaInfo.handle(e);
    }
  }

  // ToDo: remove method
  private boolean checkAvailDiskSpace(String src, File dest) throws IOException {
    // check available disk space:
    ZetaInfo.write("Check available disk space");
    FileInputStream fin = null;
    FileOutputStream fout = null;
    File file = new File(src);
    boolean result = false;
    try {
      fin = new FileInputStream(src);
      fout = new FileOutputStream(dest);
      StreamUtils.writeData(fin, fout, false, false);
    } finally {
      StreamUtils.close(fin);
      StreamUtils.close(fout);
      result = (file.length() == dest.length());
      dest.delete();
    }
    return result;
  }

  private void retrievePossibleWorkUnits() {
    offline = true;
    synchronized (activeWorkUnits) {
      possibleWorkUnits.clear();
      BufferedReader reader = null;
      try {
        WorkUnit workUnit = (WorkUnit)task.getWorkUnitClass().newInstance();
        reader = new BufferedReader(new FileReader(InetAddress.getLocalHost().getHostName() + ".tmp"));
        while (true) {
          String line = reader.readLine();
          if (line == null) {
            break;
          }
          if (workUnit.init(line) && workUnit.isValid()) {
            possibleWorkUnits.add(workUnit.clone());
          }
        }
      } catch (FileNotFoundException e) {
      } catch (Exception e) {
        ZetaInfo.handle(e);
      } finally {
        StreamUtils.close(reader);
        offline = possibleWorkUnits.isEmpty();
      }
    }
  }

  private void storePossibleWorkUnits(BufferedReader workUnits) {
    synchronized (activeWorkUnits) {
      BufferedWriter writer = null;
      List workUnitFiles = null;
      try {
        WorkUnit workUnit = (WorkUnit)task.getWorkUnitClass().newInstance();
        workUnitFiles = workUnit.parseWorkUnitFiles(workUnits);
        possibleWorkUnits.clear();
        final int l = workUnitFiles.size();
        for (int i = 0; i < l; ++i) {
          if (workUnit.init((String)workUnitFiles.get(i)) && workUnit.isValid()) {
            possibleWorkUnits.add(workUnit.clone());
          }
        }
        offline = false;
        writer = new BufferedWriter(new FileWriter(InetAddress.getLocalHost().getHostName() + ".tmp"));
        Iterator i = possibleWorkUnits.iterator();
        while (i.hasNext()) {
          writer.write(((WorkUnit)i.next()).getLogFilename());
          writer.newLine();
        }
      } catch (Exception ioe) {
        ZetaInfo.handle(ioe);
      } finally {
        StreamUtils.close(writer);
        // delete all other work units
        if (workUnitFiles != null) {
          cleanWorkUnits(workUnitFiles);
        }
      }
    }
  }

  /**
   *  Returns a unique key of this computer.
   *  @return a unique key of this computer.
  **/
  private String getKey() {
    String key = "";
    try {
      key = ZetaClient.getKey();
    } catch (Throwable t) {   // ToDo: remove
      try {
        key = properties.get("name") + properties.get("eMail") + InetAddress.getLocalHost().getHostName();
        key = Base64.encode(key.toLowerCase().getBytes("UTF-8"));
      } catch (Exception e) {
      }
    }
    return key;
  }

  /**
   *  Contains a persistent set of the ZetaGrid properties.
  **/
  private ZetaProperties properties = new ZetaProperties();

  private Task task;
  private Set possibleWorkUnits;
  private Set activeWorkUnits;
  private Set storeWorkUnits;
  private boolean offline = false;
  private int completedLocalWorkUnits;
  private int exitAfterWorkUnits = 0;
  private boolean exitAfterWorkUnitsActive = false;
  private int wait = 60000;
}