/*--
  This file is a part of ZetaGrid, a simple and secure Grid Computing
  kernel.

  Copyright (c) 2001-2004 Sebastian Wedeniwski.  All rights reserved.

  Use in source and binary forms, with or without modification,
  are permitted provided that the following conditions are met:

  1. The source code must retain the above copyright
     notice, this list of conditions and the following disclaimer.

  2. The origin of this software must not be misrepresented; you must
     not claim that you wrote the original software.  If you plan to
     use this software in a product, please contact the author.

  3. Altered source versions must be plainly marked as such, and must
     not be misrepresented as being the original software. The author
     must be informed about these changes.

  4. The name of the author may not be used to endorse or promote
     products derived from this software without specific prior written
     permission.

  THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS
  OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
  DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
  DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
  GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

  Version 1.9.0, February 8, 2004

  This program is based on the work of:
     H. Haddorp
     S. Wedeniwski
     W. Westje
--*/

package zeta.handler;

import java.io.File;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpUtils;

import zeta.TaskServer;
import zeta.WorkUnit;
import zeta.ZetaServlet;
import zeta.processor.TaskRequestWorkUnitProcessor;
import zeta.util.Base64;
import zeta.util.CachedQueries;
import zeta.util.DatabaseUtils;
import zeta.util.Parameter;

/**
 *  Handles a GET request for new work units.
 *  The request must contains the following parameters:
 *  <ul>
 *  <li><code>user</code> - name of the resource provider.</li>
 *  <li><code>email</code> - email of the resource provider.</li>
 *  <li><code>team</code> - name of the team where the resource provider is join in.</li>
 *  <li><code>task</code> - name of task for the work units.</li>
 *  <li><code>size</code> - size of the work units.</li>
 *  <li><code>number_of_work_units</code> - number of work units which are requested.</li>
 *  <li><code>hostname</code> - name of the host of the resource provider.</li>
 *  <li><code>hostaddr</code> - TCP/IP address of the host of the resource provider.</li>
 *  <li><code>version</code> - version number of the task.</li>
 *  <li><code>os_name</code> - name of the operating system of the resource provider.</li>
 *  <li><code>os_version</code> - version of the operating system of the resource provider.</li>
 *  <li><code>os_arch</code> - processor architecture of the requested resource.</li>
 *  </ul>
 *  The response contains the parameters (work_unit_id, size) of the new work units.
 *  The new work units are generated by the following algorithm:
 *  <ol>
 *  <li>Set the number requested work units equal to the number of defined processors of the specified <code>hostname</code>
 *      if the number of defined processors is larger than the number of requested work units.</li>
 *  <li>Add all active work units which are reserved for the specified <code>task</code>, <code>hostname</code>
 *      and <code>hostaddr</code> to the response.
 *      Note: the global parameter 'work_unit_id_complete' was used to reduce the evaluation time.</li>
 *  <li>Add all active work units which are reserved for the specified <code>task</code>, <code>hostname</code>, <code>user</code> and
 *      <code>email</code> to the response. This is important for resources which used DHCP.
 *      Note: the global parameter 'work_unit_id_complete' was used to reduce the evaluation time.</li>
 *  <li>Register a new resource provider if the pair (<code>user</code>, <code>email</code>) is unknown.</li>
 *  <li>Update the registered TCP/IP address of the resource <code>hostname</code> by <code>hostaddr</code>
 *      if the <code>hostaddr</code> is different to the registered address and the resource provider is equal to
 *      the pair (<code>user</code>, <code>email</code>).</li>
 *  <li>Register a new resource (<code>hostname</code>,<code>hostaddr</code>,<code>os_name</code>,<code>os_version</code>,<code>os_arch</code>)
 *      if the pair (<code>hostname</code>,<code>hostaddr</code>) is unknown.</li>
 *  <li>Add and reserve work units for the <code>task</code> from the recomputation pool if the resource provider is activated for recomputations.</li>
 *  <li>Add and reserve new work units for the specified <code>task</code>, resource and his provider. The new identifications of the work units
 *      depend on the largest known identification, the size and the globale parameter 'work_unit_id_overlap'.
 *      Note: The sizes of the new work units can be different whether they have the same size.</li>
 *  </ol>
**/
public class RequestWorkUnitHandler implements GetHandler {

  /**
   *  @param servlet  servlet which owns this handler.
  **/
  public RequestWorkUnitHandler(ZetaServlet servlet) {
    this.servlet = servlet;
  }

  /**
   *  Handles a GET request for new work units.
   *  The request <code>req</code> must contains the following parameters:
   *  <ul>
   *  <li><code>user</code> - name of the resource provider.</li>
   *  <li><code>email</code> - email of the resource provider.</li>
   *  <li><code>team</code> - name of the team where the resource provider is join in.</li>
   *  <li><code>task</code> - name of task for the work units.</li>
   *  <li><code>size</code> - size of the work units.</li>
   *  <li><code>work_units</code> - number of work units which are requested.</li>
   *  <li><code>hostname</code> - name of the host of the resource provider.</li>
   *  <li><code>hostaddr</code> - TCP/IP address of the host of the resource provider.</li>
   *  <li><code>version</code> - version number of the task.</li>
   *  <li><code>os_name</code> - name of the operating system of the resource provider.</li>
   *  <li><code>os_version</code> - version of the operating system of the resource provider.</li>
   *  <li><code>os_arch</code> - processor architecture of the requested resource.</li>
   *  </ul>
   *  The response <code>resp</code> contains the parameters (work_unit_id, size) of the new work units.
  **/
  public void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, SQLException, IOException {
    TaskServer task = TaskServer.getTask(servlet, HttpUtils.parseQueryString(req.getQueryString()));
    if (task == null) {
      servlet.log("no valid task is defined.");
      resp.sendError(HttpServletResponse.SC_NOT_IMPLEMENTED);
      return;
    }
    String user = DatabaseUtils.encodeName(task.getParameter("user"));
    if (user == null) { // user is not defined
      resp.setContentType("text/plain");
      resp.setContentLength(0);
      resp.getOutputStream().print("");
      return;
    }
    user = user.trim();
    if (user.length() == 0) { // user is not defined
      resp.setContentType("text/plain");
      resp.setContentLength(0);
      resp.getOutputStream().print("");
      return;
    }
    if (user.equalsIgnoreCase("anonymous")) {
      user = "anonymous";
    }
    String eMail = task.getParameter("email");
    if (eMail == null) {
      eMail = "";
    }
    eMail = eMail.toLowerCase();
    String receiveMessages = task.getParameter("messages");
    if (receiveMessages != null) {
      receiveMessages = receiveMessages.equals("true")? "Y" : "N";
    }
    String team = task.getParameter("team");
    String hostname = task.getParameter("hostname");
    if (hostname == null) {
      resp.setContentType("text/plain");
      resp.setContentLength(0);
      resp.getOutputStream().print("");
      return;
    }
    hostname = hostname.toLowerCase();
    String hostaddr = task.getParameter("hostaddr");
    String key = task.getParameter("key");
    if (key != null && key.length() > 0) {
      key = new String(Base64.decode(key), "UTF-8");
      if (key.length() > 200) {
        key = key.substring(0, 200);
      }
      key = key.toLowerCase();
    }
    //waitIfSameUser(user, eMail, hostname);
    String taskName = task.getParameter("task");
    String sizeOfWorkUnit = task.getParameter("size");
    String version = task.getParameter("version");
    String osName = task.getParameter("os_name");
    if (osName == null) {
      osName = "";
    }
    String osVersion = task.getParameter("os_version");
    if (osVersion == null) {
      osVersion = "";
    }
    String osArch = task.getParameter("os_arch");
    if (osArch == null) {
      osArch = "";
    }
    int processors = 1;
    try {
      processors = Integer.parseInt(task.getParameter("processors"));
    } catch (Exception e) {
      processors = 1;
    }
    int numberOfWorkUnits = 1;
    try {
      numberOfWorkUnits = Integer.parseInt(task.getParameter("work_units"));
    } catch (Exception e) {
      numberOfWorkUnits = 1;
    }

    Connection con  = null;
    Statement  stmt = null;
    try {
      con  = servlet.getConnection();
      stmt = con.createStatement();
      int taskId = task.getId();
      int serverId = servlet.getServerId();
      // Determine user ID
      int userId = 0;
      int fail = 0;
      int trust = 0;
      String teamName = null;
      String emailValid = receiveMessages;
      List searchKey = new ArrayList(3);
      searchKey.add(new Integer(serverId));
      searchKey.add(user);
      searchKey.add(eMail);
      boolean valueCached = true;
      Object[] value = (Object[])userMap.get(searchKey);
      if (value == null) {
        ResultSet rs = stmt.executeQuery("SELECT id,fail,trust,email_valid_YN,team_name FROM zeta.user WHERE server_id=" + serverId + " AND name='" + user + "' AND email='" + eMail + '\'');
        if (rs.next()) {
          valueCached = false;
          userId = rs.getInt(1);
          fail = rs.getInt(2);
          trust = rs.getInt(3);
          emailValid = rs.getString(4);
          teamName = rs.getString(5);
          rs.close();
        } else {
          rs.close();
          teamName = team;
          synchronized (RequestWorkUnitHandler.class) {
            rs = stmt.executeQuery("SELECT MAX(id) FROM zeta.user WHERE server_id=" + serverId);
            userId = (rs.next())? rs.getInt(1)+1 : 1;
            rs.close();
            if (team != null) {
              DatabaseUtils.executeAndLogUpdate(servlet,
                                                "INSERT INTO zeta.user (id,server_id,name,email,email_valid_YN,team_name,join_in_team) VALUES ("
                                                + userId + ',' + serverId + ",'" + user + "','" + eMail + "','" + ((receiveMessages == null || receiveMessages.equals("Y"))? 'Y' : 'N') + "','"
                                                + team + "',CURRENT TIMESTAMP)");
            } else {
              DatabaseUtils.executeAndLogUpdate(servlet,
                                                "INSERT INTO zeta.user (id,server_id,name,email,email_valid_YN) VALUES ("
                                                + userId + ',' + serverId + ",'" + user + "','" + eMail + "','" + ((receiveMessages == null || receiveMessages.equals("Y"))? 'Y' : 'N') + "')");
            }
          }
        }
        userMap.put(searchKey, new Object[] { new Integer(userId), new Integer(fail), new Integer(trust), emailValid, teamName });
      } else {
        userId = ((Integer)value[0]).intValue();
        fail = ((Integer)value[1]).intValue();
        trust = ((Integer)value[2]).intValue();
        emailValid = (String)value[3];
        teamName = (String)value[4];
      }

      numberOfWorkUnits = Math.max(numberOfWorkUnits, 1);
      if (trust >= 2 && numberOfWorkUnits > MAX_WORK_UNITS_TRUST2) {
        numberOfWorkUnits = MAX_WORK_UNITS_TRUST2;
      } else if (trust == 1 && numberOfWorkUnits > MAX_WORK_UNITS_TRUST1) {
        numberOfWorkUnits = MAX_WORK_UNITS_TRUST1;
      } else if (trust == 0 && numberOfWorkUnits > MAX_WORK_UNITS_TRUST0) {
        numberOfWorkUnits = MAX_WORK_UNITS_TRUST0;
      }
      int originNumberOfWorkUnits = numberOfWorkUnits;

      // Determine workstation ID
      int workstationId = 0;
      if (key != null && key.length() > 0) {
        ResultSet rs = stmt.executeQuery("SELECT id,processors_approved FROM zeta.workstation WHERE server_id=" + serverId + " AND key='" + key + '\'');
        if (rs.next()) {
          workstationId = rs.getInt(1);
          int processorsApproved = rs.getInt(2);
          if (processorsApproved > numberOfWorkUnits) {
            numberOfWorkUnits = processorsApproved;
          }
        }
        rs.close();
      }
      if (workstationId == 0) {
        searchKey.clear();
        searchKey.add(new Integer(serverId));
        searchKey.add(new Integer(userId));
        List workstationList = (List)workstationMap.get(searchKey);
        if (workstationList == null) {
          workstationList = new ArrayList(3);
          // ToDo: index on user_id?
          ResultSet rs = stmt.executeQuery("SELECT DISTINCT workstation_id FROM zeta.computation WHERE server_id=" + serverId + " AND user_id=" + userId);
          while (rs.next()) {
            workstationList.add(new Integer(rs.getInt(1)));
          }
          rs.close();
          workstationMap.put(searchKey, workstationList);
        }
        int l = workstationList.size();
        if (l > 0) {
          StringBuffer buffer = new StringBuffer(1000);
          buffer.append("SELECT id,processors_approved,key FROM zeta.workstation WHERE server_id=");
          buffer.append(serverId);
          buffer.append(" AND hostname='");
          buffer.append(hostname);
          buffer.append("' AND id IN (");
          for (int i = 0; i < l; ++i) {
            if (i > 0) {
              buffer.append(',');
            }
            buffer.append(workstationList.get(i));
          }
          buffer.append(')');
          int idx = buffer.length();
          buffer.append(" AND hostaddress='");
          buffer.append(hostaddr);
          buffer.append('\'');
          ResultSet rs = stmt.executeQuery(buffer.toString());
          if (rs.next()) {
            workstationId = rs.getInt(1);
            int processorsApproved = rs.getInt(2);
            String workstationKey = rs.getString(3);
            rs.close();
            if (processorsApproved > numberOfWorkUnits) {
              numberOfWorkUnits = processorsApproved;
            }
            if (key != null && !key.equals(workstationKey)) {
              stmt.executeUpdate("UPDATE zeta.workstation SET key='" + key + "' WHERE server_id=" + serverId + " AND id=" + workstationId);
            }
          } else {
            rs.close();
            buffer.delete(idx, buffer.length());
            rs = stmt.executeQuery(buffer.toString());
            if (rs.next()) {  // workstation changed IP address, e.g. DHCP
              workstationId = rs.getInt(1);
              int processorsApproved = rs.getInt(2);
              String workstationKey = rs.getString(3);
              rs.close();
              if (processorsApproved > numberOfWorkUnits) {
                numberOfWorkUnits = processorsApproved;
              }
              if (key != null && !key.equals(workstationKey)) {
                stmt.executeUpdate("UPDATE zeta.workstation SET (key,hostaddress)=('" + key + "','" + hostaddr + "') WHERE server_id=" + serverId + " AND id=" + workstationId);
              } else {
                stmt.executeUpdate("UPDATE zeta.workstation SET hostaddress='" + hostaddr + "' WHERE server_id=" + serverId + " AND id=" + workstationId);
              }
            } else {
              rs.close();
              l = 0;
            }
          }
        }
        if (l == 0) {   // New Workstation
          synchronized (RequestWorkUnitHandler.class) {
            ResultSet rs = stmt.executeQuery("SELECT MAX(id) FROM zeta.workstation WHERE server_id=" + serverId);
            workstationId = (rs.next())? rs.getInt(1) : 0;
            workstationList.add(new Integer(++workstationId));
            rs.close();
            if (key != null) {
              long lastUpdate = GetClientHandler.getKeyTimestamp(key);
              if (lastUpdate > 0) {
                DatabaseUtils.executeAndLogUpdate(servlet,
                                                  "INSERT INTO zeta.workstation (id,server_id,key,hostname,hostaddress,os_name,os_version,os_arch,processors,last_update) VALUES ("
                                                  + workstationId + ',' + serverId + ",'" + key + "','" + hostname + "','" + hostaddr
                                                  + "','" + osName + "','" + osVersion + "','" + osArch + "'," + processors + ",'" + (new Timestamp(lastUpdate)) + "')");
              } else {
                DatabaseUtils.executeAndLogUpdate(servlet,
                                                  "INSERT INTO zeta.workstation (id,server_id,key,hostname,hostaddress,os_name,os_version,os_arch,processors) VALUES ("
                                                  + workstationId + ',' + serverId + ",'" + key + "','" + hostname + "','" + hostaddr
                                                  + "','" + osName + "','" + osVersion + "','" + osArch + "'," + processors + ')');
              }
            } else {
              DatabaseUtils.executeAndLogUpdate(servlet,
                                                "INSERT INTO zeta.workstation (id,server_id,hostname,hostaddress,os_name,os_version,os_arch,processors) VALUES ("
                                                + workstationId + ',' + serverId + ",'" + hostname + "','" + hostaddr
                                                + "','" + osName + "','" + osVersion + "','" + osArch + "'," + processors + ')');
            }
          }
        }
      }

      // Initialize the request work unit processor
      TaskRequestWorkUnitProcessor processor = task.getRequestWorkUnitProcessor();
      if (processor == null) {
        servlet.log("could not resolve request work unit processor for task ID: " + taskId + ").");
        throw new ServletException("Could not resolve request work unit processor for task ID: " + taskId + ").");
      }

      // Active work units
      long workUnitIdComplete = Parameter.getValue(stmt, "work_unit_id_complete", taskId, 0, 3600000);
      final long workUnitIdOverlap = Parameter.getValue(stmt, "work_unit_id_overlap", taskId, 0, 3600000);
      Set activeWorkUnits = new TreeSet();
      ResultSet rs = stmt.executeQuery("SELECT work_unit_id FROM zeta.computation WHERE task_id=" + taskId
                                     + " AND work_unit_id>=" + (workUnitIdComplete-2*workUnitIdOverlap)
                                     + " AND server_id=" + serverId
                                     + " AND workstation_id=" + workstationId);
      while (rs.next()) {
        activeWorkUnits.add(new Long(rs.getLong(1)));
      }
      rs.close();
      int sz;
      Iterator iter;
      do {
        sz = activeWorkUnits.size();
        iter = activeWorkUnits.iterator();
        StringBuffer buffer = new StringBuffer(4000);
        buffer.append("SELECT work_unit_id FROM zeta.result WHERE task_id=");
        buffer.append(taskId);
        buffer.append(" AND work_unit_id IN (");
        if (iter.hasNext()) {
          buffer.append(iter.next());
          for (int j = 0; j < 250 && iter.hasNext(); ++j) {
            buffer.append(',');
            buffer.append(iter.next());
          }
          buffer.append(')');
          rs = stmt.executeQuery(buffer.toString());
          while (rs.next()) {
            activeWorkUnits.remove(new Long(rs.getLong(1)));
          }
          rs.close();
        }
      } while (iter.hasNext() && activeWorkUnits.size() > 0 && sz > activeWorkUnits.size());
      WorkUnit[] workUnits = null;
      WorkUnit[] workUnitsAll = null;
      if (numberOfWorkUnits > 0) {
        workUnits = new WorkUnit[numberOfWorkUnits];
        workUnitsAll = new WorkUnit[numberOfWorkUnits];
      }
      if (numberOfWorkUnits > 0 && activeWorkUnits.size() > 0) {
        StringBuffer buffer = new StringBuffer(2000);
        buffer.append("SELECT work_unit_id,range FROM zeta.computation WHERE task_id=");
        buffer.append(taskId);
        buffer.append(" AND work_unit_id IN (");
        iter = activeWorkUnits.iterator();
        buffer.append(iter.next());
        while (iter.hasNext()) {
          buffer.append(',');
          buffer.append(iter.next());
        }
        buffer.append(") ORDER BY start,work_unit_id");  // redistributed work units do not have higher priority!
        rs = stmt.executeQuery(buffer.toString());
        while (numberOfWorkUnits > 0 && rs.next()) {
          --numberOfWorkUnits;
          workUnitsAll[numberOfWorkUnits] = processor.getWorkUnit(taskId, rs.getLong(1), rs.getInt(2));
        }
        rs.close();
      }
      if (numberOfWorkUnits > 0) {
        synchronized (RequestWorkUnitHandler.class) {
          // Recomutation of the work unit?
          rs = stmt.executeQuery("SELECT work_unit_id,range FROM zeta.recomputation WHERE task_id=" + taskId
                                 + " AND server_id=" + serverId
                                 + " AND (start IS NULL AND (workstation_id IS NULL OR workstation_id=" + workstationId + ") AND " + userId
                                 + " IN (" + CachedQueries.getUsersForRecomputation(stmt, serverId)
                                 + " ) OR stop IS NULL AND user_id=" + userId
                                 + " AND workstation_id=" + workstationId + ") ORDER BY start,work_unit_id");  // redistributed work units does not have higher priority!
          int size = DEFAULT_WORK_UNIT_SIZE;
          long workUnitId = 0;
          if (rs.next()) {
            StringBuffer where = new StringBuffer(100);
            do {
              --numberOfWorkUnits;
              workUnitId = rs.getLong(1);
              size = rs.getInt(2);
              workUnitsAll[numberOfWorkUnits] = processor.getWorkUnit(taskId, workUnitId, size);
              if (where.length() > 0) {
                where.append(',');
              }
              where.append(workUnitId);
            } while (numberOfWorkUnits > 0 && rs.next());
            rs.close();
            DatabaseUtils.executeAndLogUpdate(servlet,
                                              "UPDATE zeta.recomputation SET (version,workstation_id,user_id,start)=('" + version
                                              + "'," + workstationId + ',' + userId + ",CURRENT TIMESTAMP) WHERE task_id=" + taskId
                                              + " AND server_id=" + serverId + " AND work_unit_id IN (" + where.toString() + ')');
          } else rs.close();
          if (fail > 0) {
            numberOfWorkUnits = (originNumberOfWorkUnits == numberOfWorkUnits)? 1 : 0;
          }
          // Generate new work units
          if (numberOfWorkUnits == 1) { // Fill Gaps
            String minFillGaps = Parameter.getValue(stmt, "min_fill_gaps", taskId, "", 3600000);
            if (minFillGaps.length() > 0) {
              String maxFillGaps = Parameter.getValue(stmt, "max_fill_gaps", taskId, "", 3600000);
              Statement stmt2 = null;
              try {
                stmt2 = con.createStatement();
                rs = stmt.executeQuery("SELECT work_unit_id,range FROM zeta.server_range WHERE server_id=" + serverId
                                     + " AND task_id=" + taskId + " AND work_unit_id between " + minFillGaps + " and " + maxFillGaps
                                     + " ORDER BY work_unit_id");
                while (rs.next()) {
                  long serverMaxWorkUnitId = rs.getLong(1);
                  long serverMaxWorkUnitIdEnd = serverMaxWorkUnitId+rs.getLong(2);
                  ResultSet rs2 = stmt2.executeQuery("SELECT MAX(work_unit_id)"
                                                   + " FROM zeta.computation WHERE task_id=" + taskId + " AND work_unit_id>=" + serverMaxWorkUnitId
                                                   + " AND work_unit_id<" + (serverMaxWorkUnitIdEnd-workUnitIdOverlap));
                  if (rs2.next()) {
                    long max = rs2.getLong(1);
                    rs2.close();
                    rs2 = stmt2.executeQuery("SELECT range FROM zeta.computation WHERE task_id=" + taskId + " AND work_unit_id=" + max);
                    if (rs2.next()) {
                      max += rs2.getInt(1);
                      if (max < serverMaxWorkUnitIdEnd) {
                        workUnitId = (max > serverMaxWorkUnitId)? max-workUnitIdOverlap : max;
                        size = getWorkUnitSize(taskId, sizeOfWorkUnit, workUnitId, stmt);
                        int size2 = (serverMaxWorkUnitIdEnd < workUnitId+size)? ((int)(serverMaxWorkUnitIdEnd-workUnitId+workUnitIdOverlap)) : size;
                        --numberOfWorkUnits;
                        workUnitsAll[numberOfWorkUnits] = workUnits[numberOfWorkUnits] = processor.getWorkUnit(taskId, workUnitId, size2);
                        break;
                      }
                    }
                  }
                  rs2.close();
                }
                rs.close();
              } finally {
                DatabaseUtils.close(stmt2);
              }
            }
          }
          if (numberOfWorkUnits > 0) {
            long[] serverMaxWorkUnitId = null;
            while (true) {
              serverMaxWorkUnitId = Parameter.getServerMaxWorkUnitId(servlet, stmt, serverId, taskId);
              rs = stmt.executeQuery("SELECT MAX(work_unit_id+range-" + workUnitIdOverlap
                                     + ") FROM zeta.computation WHERE task_id=" + taskId + " AND work_unit_id+range>=" + serverMaxWorkUnitId[0]
                                     + " AND work_unit_id+range<=" + (serverMaxWorkUnitId[0]+serverMaxWorkUnitId[1]));
              if (rs.next()) {
                workUnitId = rs.getLong(1);
                if (workUnitId == 0 && rs.wasNull()) {
                  rs.close();
                  workUnitId = Parameter.newServerMaxWorkUnitId(servlet, stmt, serverId, taskId);
                  if (workUnitId > 0) {
                    break;
                  }
                } else if (workUnitId+size >= serverMaxWorkUnitId[0]+serverMaxWorkUnitId[1]) {
                  rs.close();
                  rs = stmt.executeQuery("SELECT MAX(work_unit_id) FROM zeta.computation WHERE task_id=" + taskId + " AND work_unit_id+range>=" + serverMaxWorkUnitId[0]
                                       + " AND work_unit_id<" + (serverMaxWorkUnitId[0]+serverMaxWorkUnitId[1]-workUnitIdOverlap));
                  if (rs.next() && rs.getLong(1) == workUnitId) {
                    rs.close();
                    workUnitId = Parameter.newServerMaxWorkUnitId(servlet, stmt, serverId, taskId);
                    if (workUnitId > 0) {
                      break;
                    }
                  } else {
                    rs.close();
                    break;
                  }
                } else {
                  rs.close();
                  break;
                }
              } else {
                rs.close();
                workUnitId = Parameter.newServerMaxWorkUnitId(servlet, stmt, serverId, taskId);
                if (workUnitId > 0) {
                  break;
                }
              }
            }
            size = getWorkUnitSize(taskId, sizeOfWorkUnit, workUnitId, stmt);
            do {
              if (serverMaxWorkUnitId[0]+serverMaxWorkUnitId[1] <= workUnitId) {
                Parameter.newServerMaxWorkUnitId(servlet, stmt, serverId, taskId);
                serverMaxWorkUnitId = Parameter.getServerMaxWorkUnitId(servlet, stmt, serverId, taskId);
              }
              int size2 = size;
              if (serverMaxWorkUnitId[0]+serverMaxWorkUnitId[1] < workUnitId+size) {
                size2 = (int)(serverMaxWorkUnitId[0]+serverMaxWorkUnitId[1]-workUnitId+workUnitIdOverlap);
                Parameter.newServerMaxWorkUnitId(servlet, stmt, serverId, taskId);
                serverMaxWorkUnitId = Parameter.getServerMaxWorkUnitId(servlet, stmt, serverId, taskId);
              }
              workUnitsAll[numberOfWorkUnits-1] = workUnits[numberOfWorkUnits-1] = processor.getWorkUnit(taskId, workUnitId, size2);
              int r = processor.activateWorkUnit(stmt, workUnits[numberOfWorkUnits-1]);
              if (r < 0) {
                resp.setContentType("text/plain");
                resp.setContentLength(0);
                resp.getOutputStream().print("");
                return;
              } else if (r == 0) {
                numberOfWorkUnits = 1;
              }
              workUnitId += (size == size2)? size-workUnitIdOverlap : size2;
            } while (--numberOfWorkUnits > 0);
          }
        }
      }
      if (workUnits != null && workUnitsAll != null) {
        insertWorkUnitsIntoDatabase(workUnits, processor, taskId, serverId, workstationId, version, userId);
        StringBuffer response = new StringBuffer(85*workUnits.length);
        for (int i = workUnitsAll.length-1; i >= 0; --i) {
          if (workUnitsAll[i] != null && workUnitsAll[i].isValid()) {
            response.append(workUnitsAll[i].writeObject());
          }
        }  
        resp.setContentType("text/plain");
        resp.setContentLength(response.length());
        resp.getOutputStream().print(response.toString());
      } else {
        resp.setContentType("text/plain");
        resp.setContentLength(0);
        resp.getOutputStream().print("");
      }
    } finally {
      DatabaseUtils.close(stmt);
      DatabaseUtils.close(con);
    }
  }

  private void insertWorkUnitsIntoDatabase(WorkUnit[] workUnits, TaskRequestWorkUnitProcessor processor, int taskId, int serverId,int workstationId, String version, int userId) throws ServletException, SQLException  {
    StringBuffer sql = new StringBuffer(150*workUnits.length+120);
    for (int i = workUnits.length-1; i >= 0; --i) {
      if (workUnits[i] != null && workUnits[i].isValid()) {
        if (sql.length() == 0) {
          sql.append("INSERT INTO zeta.computation (task_id,work_unit_id,range,server_id,workstation_id,user_id,version,start,parameters) VALUES (");
        } else {
          sql.append(",(");
        }
        sql.append(taskId);
        sql.append(',');
        sql.append(workUnits[i].getWorkUnitId());
        sql.append(',');
        sql.append(workUnits[i].getSize());
        sql.append(',');
        sql.append(serverId);
        sql.append(',');
        sql.append(workstationId);
        sql.append(',');
        sql.append(userId);
        sql.append(",'");
        sql.append(version);
        sql.append("',CURRENT_TIMESTAMP,");
        String parameters = processor.getParameters(workUnits[i]);
        if (parameters == null) {
          sql.append("NULL");
        } else {
          sql.append('\'');
          sql.append(DatabaseUtils.encodeName(parameters));
          sql.append('\'');
        }
        sql.append(')');
      }
    }
    if (sql.length() > 0) {
      DatabaseUtils.executeAndLogUpdate(servlet, sql.toString());
    }
  }

  /**
   *  Returns the size of a work unit for a specified <code>taskId</code>, <code>size</code> and <code>workUnitId</code>.
   *  @param taskId  identification of the task.
   *  @param size    size of the work unit.
   *  @param workUnitId  id of the work unit
   *  @return the size of a work unit for a specified <code>taskId</code>, <code>size</code> and <code>workUnitId</code>.
  **/
  private int getWorkUnitSize(int taskId, String sizeOfWorkUnit, long workUnitId, Statement stmt) throws SQLException {
    String search = Integer.toString(taskId) + ':' + sizeOfWorkUnit;
    List sizes = (List)workUnitSize.get(search);
    if (sizes != null) {
      final int l = sizes.size()-1;
      for (int i = 0; i < l; i += 2) {
        if (((Long)sizes.get(i)).longValue() <= workUnitId) {
          return ((Long)sizes.get(i+1)).intValue();
        }
      }
    }
    ResultSet rs = stmt.executeQuery("SELECT work_unit_id,range FROM zeta.work_unit_size WHERE task_id=" + taskId + " AND size='" + sizeOfWorkUnit + "' ORDER BY work_unit_id DESC");
    if (rs.next()) {
      sizes = new ArrayList(20);
      sizes.add(new Long(rs.getLong(1)));
      Long size = new Long(rs.getInt(2));
      sizes.add(size);
      while (rs.next()) {
        sizes.add(new Long(rs.getLong(1)));
        sizes.add(new Long(rs.getInt(2)));
      }
      workUnitSize.put(search, sizes);
      return size.intValue();
    } else {
      servlet.log("not found: SELECT work_unit_id,range FROM zeta.work_unit_size WHERE task_id=" + taskId + " AND size='" + sizeOfWorkUnit + "' ORDER BY work_unit_id DESC");
    }
    sizes = new ArrayList(2);
    sizes.add(new Long(0));
    sizes.add(new Long(DEFAULT_WORK_UNIT_SIZE));
    workUnitSize.put(search, sizes);
    return DEFAULT_WORK_UNIT_SIZE;
  }

  /*
   *  Waits 5 minutes if the same user/workstation requested work units in a time interval less than 10 minutes
   *  without any 'getClient' request in that time interval.
   *  @param user name of the user
   *  @param eMail e-Mail of the user
   *  @param hostname name of the workstation which requested work units

  private void waitIfSameUser(String user, String eMail, String hostname) {
    // ToDo: Problem with multi-processor machines
    if (GetClientHandler.isHostname(hostname)) {
      return;
    }
    long wait = 0;
    synchronized (queueUser) {
      long timestamp = System.currentTimeMillis();
      int l = queueUser.size();
      int i = 0;
      while (i < l && timestamp-((Long)((Object[])queueUser.get(i))[0]).longValue() > 10*60*1000) {
        ++i;
      }
      if (i > 0) {
        queueUser.subList(0, i).clear();
      }
      int request = 1;
      l = queueUser.size();
      for (i = 0; i < l; ++i) {
        Object[] obj = (Object[])queueUser.get(i);
        if (user.equals(obj[1]) && eMail.equals(obj[2]) && hostname.equals(obj[3])) {
          request = ((Integer)obj[4]).intValue()+1;
          break;
        }
      }
      if (i < l) {
        queueUser.remove(i);
        if (request > MAX_WORK_UNITS) {
          wait = 5 * 60 * 1000;
        }
        servlet.log("wait (" + request + "):" + user + ", " + eMail + ", " + hostname + ", " + wait);
      }
      Object[] obj = { new Long(timestamp), user, eMail, hostname, new Integer(request) };
      queueUser.add(obj);
    }
    if (wait > 0) {
      try {
        Thread.sleep(wait);
      } catch (InterruptedException ex) {
      }
    }
  }


   *  Queue containing the requested users of the last 10 minutes.
  private static List queueUser = new ArrayList(1000);*/

  /**
   *  Default size of a new work unit.
  **/
  private static final int DEFAULT_WORK_UNIT_SIZE = 500000;

  /**
   *  Maximal number of work units which can be requested by one client.
  **/
  private static final int MAX_WORK_UNITS_TRUST0 = 5;
  private static final int MAX_WORK_UNITS_TRUST1 = 10;
  private static final int MAX_WORK_UNITS_TRUST2 = 25;

  /**
   *  Defined sizes for specified tasks and work unit sizes.
  **/
  private Map workUnitSize = new HashMap();

  /**
   *  Maps a key (server-ID,name,email) of an user to the user-ID.
  **/
  private Map userMap = new HashMap();

  /**
   *  Maps a key (server-ID,user-ID) of a workstation to a list of workstation-IDs.
  **/
  private Map workstationMap = new HashMap();

  /**
   *  Servlet which owns this handler.
  **/
  private ZetaServlet servlet;
}