package zeta.handler;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.util.Map;
import java.util.zip.Deflater;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpUtils;
import zeta.ZetaServlet;
import zeta.util.DatabaseUtils;
import zeta.util.Parameter;
import zeta.util.StreamUtils;
import BlowfishJ.BlowfishECB;
public class SynchronizationHandler implements GetHandler, PostHandler {
public SynchronizationHandler(ZetaServlet servlet) {
this.servlet = servlet;
}
public void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
Connection con = null;
Statement stmt = null;
try {
con = servlet.getConnection();
stmt = con.createStatement();
int serverId = servlet.getServerId();
String successful = req.getParameter("successful");
if (successful != null && successful.length() > 0) { String timestamp = req.getParameter("timestamp");
if (timestamp != null) {
stmt.executeUpdate("DELETE FROM zeta.server_synchronization WHERE timestamp<'" + timestamp + '\'');
String s = "UPDATE zeta.server SET last_synchronization='" + successful + "' WHERE server_id=" + serverId;
servlet.log(s);
stmt.executeUpdate(s);
}
} else {
servlet.log("generates synchronization data of this server " + serverId);
ResultSet rs = stmt.executeQuery("SELECT key FROM zeta.server WHERE server_id=" + serverId);
byte[] serverKey = (rs.next())? rs.getBytes(1) : null;
rs.close();
if (serverKey == null) {
throw new ServletException("Missing key for server " + serverId);
}
StringWriter sWriter = new StringWriter(50000);
BufferedWriter writer = new BufferedWriter(sWriter);
Timestamp lastSynchronization = null;
Timestamp timestamp = null;
int count = 0;
int maxStmtSynchronization = 0;
try {
maxStmtSynchronization = Integer.parseInt(Parameter.getValue(stmt, "max_stmt_synchronization", "0"));
} catch (NumberFormatException nfe) {
maxStmtSynchronization = 0;
}
rs = stmt.executeQuery("SELECT MAX(timestamp) FROM zeta.server_synchronization");
if (rs.next()) {
timestamp = rs.getTimestamp(1);
rs.close();
rs = stmt.executeQuery("SELECT sql_statement,timestamp FROM zeta.server_synchronization WHERE timestamp<'" + timestamp + "' ORDER BY timestamp");
while (rs.next()) {
if (++count == maxStmtSynchronization) {
timestamp = rs.getTimestamp(2);
break;
}
String s = rs.getString(1) + '\n';
writer.write(s, 0, s.length());
lastSynchronization = rs.getTimestamp(2);
}
rs.close();
}
writer.close();
if (lastSynchronization != null) {
byte[] statements = sWriter.toString().getBytes("UTF-8");
ByteArrayOutputStream out = new ByteArrayOutputStream(statements.length);
ZipOutputStream zip = new ZipOutputStream(out);
zip.setLevel(Deflater.BEST_COMPRESSION);
zip.putNextEntry(new ZipEntry("synchronization.sql"));
zip.write(statements);
zip.flush();
zip.close();
out.close();
statements = out.toByteArray();
int statementsLength = statements.length;
statements = StreamUtils.align8(statements);
BlowfishECB bfecb = new BlowfishECB(serverKey);
bfecb.encrypt(statements);
bfecb.cleanUp();
out = new ByteArrayOutputStream(statements.length+100);
zip = new ZipOutputStream(out);
zip.setLevel(Deflater.BEST_COMPRESSION);
zip.putNextEntry(new ZipEntry("statementsLength"));
zip.write(String.valueOf(statementsLength).getBytes("UTF-8"));
zip.putNextEntry(new ZipEntry("timestamp"));
zip.write(timestamp.toString().getBytes("UTF-8"));
zip.putNextEntry(new ZipEntry("lastSynchronization"));
zip.write(lastSynchronization.toString().getBytes("UTF-8"));
zip.putNextEntry(new ZipEntry("synchronization.sql"));
zip.write(statements);
zip.flush();
zip.close();
out.close();
resp.setContentType("application/octet-stream");
resp.setContentLength(out.size());
out.writeTo(resp.getOutputStream());
}
}
} catch(SQLException e) {
throw new ServletException(e);
} finally {
DatabaseUtils.close(stmt);
DatabaseUtils.close(con);
}
}
public void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
if (req.getContentType().equals("application/octet-stream")) {
String paramString = req.getHeader("Param-String"); Map parameter = HttpUtils.parseQueryString(paramString);
String serverId = getParameter(parameter, "server_id");
int statementsLength = Integer.parseInt(getParameter(parameter, "statements_length"));
Connection con = null;
Statement stmt = null;
try {
con = servlet.getConnection();
stmt = con.createStatement();
servlet.log("start synchronization with server " + serverId + " (" + statementsLength + ')');
ResultSet rs = stmt.executeQuery("SELECT key FROM zeta.server WHERE server_id=" + serverId);
byte[] serverKey = (rs.next())? rs.getBytes(1) : null;
rs.close();
if (serverKey == null) {
throw new ServletException("Missing key for server " + serverId);
}
ByteArrayOutputStream out = new ByteArrayOutputStream(req.getContentLength());
StreamUtils.writeData(req.getInputStream(), out, false, true);
BlowfishECB bfecb = new BlowfishECB(serverKey);
byte[] statements = out.toByteArray();
bfecb.decrypt(statements);
bfecb.cleanUp();
ZipInputStream zip = new ZipInputStream(new ByteArrayInputStream(statements, 0, statementsLength));
ZipEntry zEntry = zip.getNextEntry();
if (zEntry == null) {
throw new IOException("An error occur in the synchronization data!");
}
out = new ByteArrayOutputStream(100000);
StreamUtils.writeData(zip, out, true, true);
BufferedReader reader = new BufferedReader(new StringReader(new String(out.toByteArray(), "UTF-8")));
int countStatements = 0;
StringBuffer statement = new StringBuffer(1000);
while (true) {
String line = reader.readLine();
if (statement.length() > 0) {
String trimLine = (line == null)? null : line.trim();
if (line == null || trimLine.regionMatches(true, 0, "INSERT", 0, 6) || trimLine.regionMatches(true, 0, "UPDATE", 0, 6)) {
try {
stmt.executeUpdate(statement.toString());
} catch (SQLException se) {
stmt.executeUpdate("INSERT INTO zeta.error (server_id,timestamp,sql_statement) VALUES (" + serverId + ",CURRENT TIMESTAMP,'" + DatabaseUtils.encodeName(statement.toString()) + "')");
}
++countStatements;
statement.delete(0, statement.length());
} else {
statement.append('\n');
}
}
if (line == null) {
break;
}
statement.append(line);
}
reader.close();
servlet.log(String.valueOf(countStatements) + " synchronizations from server " + serverId + " are executed.");
} catch(SQLException e) {
throw new ServletException(e);
} finally {
DatabaseUtils.close(stmt);
DatabaseUtils.close(con);
}
resp.setStatus(HttpServletResponse.SC_OK);
resp.getOutputStream().print("ok");
} else {
resp.sendError(HttpServletResponse.SC_NOT_IMPLEMENTED);
}
}
private static String getParameter(Map parameter, String key) {
String[] values = (String[])parameter.get(key);
return (values == null || values.length == 0)? "" : values[0];
}
private ZetaServlet servlet;
}