package zeta.handler;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.util.Map;
import java.util.zip.Deflater;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpUtils;
import zeta.TaskServer;
import zeta.ZetaServlet;
import zeta.processor.TaskResultProcessor;
import zeta.util.Base64;
import zeta.util.DatabaseUtils;
import zeta.util.Parameter;
import zeta.util.ProcessUtils;
import zeta.util.StreamUtils;
public class ResultHandler implements PostHandler {
public ResultHandler(ZetaServlet servlet) {
this.servlet = servlet;
}
public void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, SQLException, IOException {
Connection con = null;
PreparedStatement pStmt = null;
Statement stmt = null;
String contentType = req.getContentType();
if (contentType.startsWith("application/")) {
if (!contentType.equals("application/octet-stream")) {
servlet.log("wrong content type: " + contentType);
}
TaskServer task = null;
try {
String paramString = req.getHeader("Param-String");
if (paramString == null || paramString.length() == 0) {
resp.sendError(HttpServletResponse.SC_NOT_IMPLEMENTED);
return;
}
if (concurrentConnections >= MAX_CONCURRENT_CONNECTIONS) {
resp.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
servlet.log("too many concurrent connections!");
return;
}
task = TaskServer.getTask(servlet, HttpUtils.parseQueryString(paramString)); } catch (IllegalArgumentException iae) {
servlet.log("Param-String is invalid: " + req.getHeader("Param-String"));
}
if (task == null) {
servlet.log("no valid task is defined.");
resp.sendError(HttpServletResponse.SC_NOT_IMPLEMENTED);
return;
}
int bufferSize = req.getContentLength();
String workUnitId = null;
try {
workUnitId = task.getParameter("work_unit_id");
if (workUnitId == null || workUnitId.length() == 0 || Long.parseLong(workUnitId) <= 0) {
servlet.log("wrong work unit ID: " + workUnitId + ", bufferSize=" + bufferSize);
resp.sendError(HttpServletResponse.SC_NOT_IMPLEMENTED);
return;
}
} catch (Exception e) {
servlet.log("exception: wrong work unit ID: " + workUnitId + ", bufferSize=" + bufferSize);
resp.setStatus(HttpServletResponse.SC_OK);
resp.setContentType("text/plain");
resp.setContentLength(2);
resp.getOutputStream().print("ok");
return;
}
try {
++concurrentConnections; servlet.log("concurrent connections: " + concurrentConnections + ", work unit ID: " + workUnitId);
if (bufferSize <= 2) {
synchronized (encryptionClass) {
if (task.getId() != encryptionClassTaskId) {
ByteArrayOutputStream out = new ByteArrayOutputStream(2000);
ZipOutputStream zip = new ZipOutputStream(out);
zip.setLevel(Deflater.BEST_COMPRESSION);
zip.putNextEntry(new ZipEntry("className"));
StreamUtils.writeData(new ByteArrayInputStream(task.getEncryptionClass()), zip, true, false);
zip.putNextEntry(new ZipEntry("signature"));
StreamUtils.writeData(new ByteArrayInputStream(task.getEncryptionSignature().getBytes("UTF-8")), zip, true, false);
zip.flush();
zip.close();
encryptionClass = Base64.encode(out.toByteArray());
encryptionClassTaskId = task.getId();
}
resp.setStatus(HttpServletResponse.SC_ACCEPTED);
resp.setContentType("text/plain");
resp.setContentLength(encryptionClass.length());
resp.getOutputStream().print(encryptionClass);
}
return;
}
String user = DatabaseUtils.encodeName(task.getParameter("user"));
String hostname = task.getParameter("hostname");
String hostaddr = task.getParameter("hostaddr");
con = servlet.getConnection();
stmt = con.createStatement();
int taskId = task.getId();
TaskResultProcessor processor = task.getResultProcessor();
if (processor == null) {
servlet.log("could not resolve result processor for task ID: " + taskId + ").");
}
ResultSet rs = stmt.executeQuery("SELECT work_unit_id FROM zeta.result WHERE task_id=" + taskId + " AND work_unit_id=" + workUnitId);
if (!rs.next()) { rs.close();
Timestamp currentTime = new Timestamp(System.currentTimeMillis());
try {
DatabaseUtils.close(stmt);
stmt = null;
DatabaseUtils.close(con);
con = null;
ByteArrayOutputStream out = new ByteArrayOutputStream(bufferSize);
StreamUtils.writeData(req.getInputStream(), out, false, true);
byte[] buffer = out.toByteArray();
if (buffer.length != bufferSize) {
if (dataNotCompleteWorkUnitId.equals(workUnitId)) {
++dataNotCompleteCount;
} else {
dataNotCompleteWorkUnitId = workUnitId;
dataNotCompleteCount = 1;
}
if (dataNotCompleteCount >= MAX_DATA_NOT_COMPLETE_COUNT) {
resp.setStatus(HttpServletResponse.SC_OK);
resp.setContentType("text/plain");
resp.setContentLength(2);
resp.getOutputStream().print("ok");
servlet.log("data not complete recomputation: task_id=" + taskId + ", work_unit_id=" + workUnitId + ", user=" + user + ", hostname=" + hostname + ", hostaddr=" + hostaddr + ", buffer.length=" + buffer.length + ", bufferSize=" + bufferSize);
} else {
resp.setStatus(HttpServletResponse.SC_ACCEPTED);
servlet.log("data not complete: task_id=" + taskId + ", work_unit_id=" + workUnitId + ", user=" + user + ", hostname=" + hostname + ", hostaddr=" + hostaddr + ", buffer.length=" + buffer.length + ", bufferSize=" + bufferSize);
}
return;
}
if (processor != null) {
try {
processor.checkResult(processor.getWorkUnit(taskId, Long.parseLong(workUnitId), 1), buffer);
} catch (Exception e) {
resp.sendError(HttpServletResponse.SC_NOT_ACCEPTABLE);
servlet.log("data corrupted: task_id=" + taskId + ", work_unit_id=" + workUnitId + ", user=" + user + ", hostname=" + hostname + ", hostaddr=" + hostaddr);
return;
}
}
con = servlet.getConnection();
stmt = con.createStatement();
con.setAutoCommit(false);
if (processor != null) { if (!processor.processResult(stmt, processor.getWorkUnit(taskId, Long.parseLong(workUnitId), 1), buffer, false)) {
buffer = null;
}
}
pStmt = con.prepareStatement("INSERT INTO zeta.result (task_id,work_unit_id,stop,result) VALUES ("
+ taskId + ',' + workUnitId + ",'" + currentTime.toString() + "',?)");
pStmt.setBytes(1, buffer);
int affectetRows = pStmt.executeUpdate();
if (affectetRows == 0) {
resp.setStatus(HttpServletResponse.SC_OK);
resp.setContentType("text/plain");
resp.setContentLength(2);
resp.getOutputStream().print("ok");
servlet.log("entry not found: task_id=" + taskId + ", work_unit_id=" + workUnitId + ", user=" + user + ", hostname=" + hostname + ", hostaddr=" + hostaddr);
return;
}
DatabaseUtils.log(servlet.getServerId(), stmt,
"INSERT INTO zeta.result (task_id,work_unit_id,stop) VALUES ("
+ taskId + ',' + workUnitId + ",'" + currentTime.toString() + "')");
} catch (IOException ioe) {
resp.sendError(HttpServletResponse.SC_BAD_REQUEST);
servlet.log("IOException: task_id=" + taskId + ", work_unit_id=" + workUnitId + ", user=" + user + ", hostname=" + hostname + ", hostaddr=" + hostaddr);
return;
} finally {
if (con != null) {
con.commit();
con.setAutoCommit(true);
}
}
} else {
rs.close(); rs = stmt.executeQuery("SELECT stop FROM zeta.recomputation WHERE task_id=" + taskId + " AND work_unit_id=" + workUnitId);
if (rs.next() && rs.getTimestamp(1) == null) {
rs.close();
Timestamp currentTime = new Timestamp(System.currentTimeMillis());
try {
DatabaseUtils.close(stmt);
stmt = null;
DatabaseUtils.close(con);
con = null;
ByteArrayOutputStream out = new ByteArrayOutputStream(bufferSize);
StreamUtils.writeData(req.getInputStream(), out, false, true);
byte[] buffer = out.toByteArray();
if (buffer.length != bufferSize) {
if (dataNotCompleteWorkUnitId.equals(workUnitId)) {
++dataNotCompleteCount;
} else {
dataNotCompleteWorkUnitId = workUnitId;
dataNotCompleteCount = 1;
}
if (dataNotCompleteCount >= MAX_DATA_NOT_COMPLETE_COUNT) {
resp.setStatus(HttpServletResponse.SC_OK);
resp.setContentType("text/plain");
resp.setContentLength(2);
resp.getOutputStream().print("ok");
servlet.log("data not complete recomputation: task_id=" + taskId + ", work_unit_id=" + workUnitId + ", user=" + user + ", hostname=" + hostname + ", hostaddr=" + hostaddr + ", buffer.length=" + buffer.length + ", bufferSize=" + bufferSize);
} else {
resp.setStatus(HttpServletResponse.SC_ACCEPTED);
servlet.log("data not complete: task_id=" + taskId + ", work_unit_id=" + workUnitId + ", user=" + user + ", hostname=" + hostname + ", hostaddr=" + hostaddr + ", buffer.length=" + buffer.length + ", bufferSize=" + bufferSize);
}
return;
}
if (processor != null) {
try {
processor.checkResult(processor.getWorkUnit(taskId, Long.parseLong(workUnitId), 1), buffer);
} catch (Exception e) {
resp.sendError(HttpServletResponse.SC_NOT_ACCEPTABLE);
servlet.log("data corrupted: task_id=" + taskId + ", work_unit_id=" + workUnitId + ", user=" + user + ", hostname=" + hostname + ", hostaddr=" + hostaddr);
return;
}
}
con = servlet.getConnection();
stmt = con.createStatement();
con.setAutoCommit(false);
if (processor != null) {
if (!processor.processResult(stmt, processor.getWorkUnit(taskId, Long.parseLong(workUnitId), 1), buffer, true)) {
buffer = null;
}
}
pStmt = con.prepareStatement("UPDATE zeta.recomputation SET (stop,result)=('" + currentTime.toString() + "',?) WHERE task_id=" + taskId + " AND work_unit_id=" + workUnitId);
pStmt.setBytes(1, buffer);
int affectetRows = pStmt.executeUpdate();
if (affectetRows == 0) {
resp.setStatus(HttpServletResponse.SC_OK);
resp.setContentType("text/plain");
resp.setContentLength(2);
resp.getOutputStream().print("ok");
servlet.log("entry not found: task_id=" + taskId + ", work_unit_id=" + workUnitId + ", user=" + user + ", hostname=" + hostname + ", hostaddr=" + hostaddr);
return;
}
DatabaseUtils.log(servlet.getServerId(), stmt,
"UPDATE zeta.recomputation SET stop='" + currentTime.toString() + "' WHERE task_id=" + taskId + " AND work_unit_id=" + workUnitId);
} catch (IOException ioe) {
resp.sendError(HttpServletResponse.SC_BAD_REQUEST);
servlet.log("IOException: task_id=" + taskId + ", work_unit_id=" + workUnitId + ", user=" + user + ", hostname=" + hostname + ", hostaddr=" + hostaddr);
return;
} finally {
if (con != null) {
con.commit();
con.setAutoCommit(true);
}
}
} else {
rs.close();
servlet.log("work unit exists already: task_id=" + taskId + ", work_unit_id=" + workUnitId + ", user=" + user + ", hostname=" + hostname + ", hostaddr=" + hostaddr);
try {
StreamUtils.writeData(req.getInputStream(), null, false, false);
} catch (IOException ioe) {
servlet.log("IOException: task_id=" + taskId + ", work_unit_id=" + workUnitId + ", user=" + user + ", hostname=" + hostname + ", hostaddr=" + hostaddr);
}
}
}
} catch (SQLException e) {
throw e;
} finally {
DatabaseUtils.close(stmt);
DatabaseUtils.close(pStmt);
DatabaseUtils.close(con);
--concurrentConnections;
}
resp.setStatus(HttpServletResponse.SC_OK);
resp.setContentType("text/plain");
resp.setContentLength(2);
resp.getOutputStream().print("ok");
} else {
servlet.log("wrong content type: " + contentType);
resp.sendError(HttpServletResponse.SC_NOT_IMPLEMENTED);
}
}
private final int MAX_CONCURRENT_CONNECTIONS = 20;
private int concurrentConnections = 0;
private String dataNotCompleteWorkUnitId = "";
private short dataNotCompleteCount = 0;
private final short MAX_DATA_NOT_COMPLETE_COUNT = 5;
private ZetaServlet servlet;
private static String encryptionClass = "";
private static int encryptionClassTaskId = -1;
}