package zeta;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLEncoder;
import java.net.UnknownHostException;
import java.security.SecureRandom;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.zip.Deflater;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;
import zeta.monitor.MonitoredOutputStream;
import zeta.monitor.MonitoredURLStreamHandler;
import zeta.util.Base64;
import zeta.util.StreamUtils;
public class WorkUnitManager {
public WorkUnitManager(Task task) {
this.task = task;
try {
WorkUnit workUnitComperator = (WorkUnit)task.getWorkUnitClass().newInstance();
possibleWorkUnits = new TreeSet(workUnitComperator);
activeWorkUnits = new TreeSet(workUnitComperator);
storeWorkUnits = new TreeSet(workUnitComperator);
} catch (Exception e) {
ZetaInfo.handle(e);
throw new IllegalArgumentException(e.getMessage());
}
cleanWorkUnits(null);
retrievePossibleWorkUnits();
if (!ZetaClient.isUnknownHostExceptionOccur()) {
submit(null, false, true);
}
completedLocalWorkUnits = numberOfCompletedLocalWorkUnits();
exitAfterWorkUnits = properties.get("exit", 0);
exitAfterWorkUnitsActive = (exitAfterWorkUnits > 0);
int transferDetect = properties.get("transfer.detect", 0);
if (transferDetect > 0 && properties.get("transfer.asynchronous", "false").equals("true")) {
Thread t = new Thread() {
public void run() {
while (true) {
int transferDetect = Math.max(properties.get("transfer.detect", 0), 300);
if (!properties.get("transfer.asynchronous", "false").equals("true") || transferDetect == 0) {
try {
Thread.sleep(60000); } catch (InterruptedException ex) {
}
} else {
if (numberOfCompletedLocalWorkUnits() > 0) {
boolean error = true;
HttpURLConnection connection = null;
try {
URL url = new URL("http", properties.get("host.name", "www.zetagrid.net"), properties.get("host.port", 80),
properties.get("resultURL", "/servlet/service/result"));
connection = (HttpURLConnection)url.openConnection();
connection.setUseCaches(false);
connection.setRequestMethod("GET");
connection.setDoInput(true);
connection.connect();
error = false;
} catch (Exception e) {
error = true;
} finally {
if (connection != null) {
connection.disconnect();
}
}
if (!error) {
submit(null, false, false);
}
}
try {
Thread.sleep(transferDetect*1000);
} catch (InterruptedException ex) {
}
}
}
}
};
t.start();
}
}
public WorkUnit getWorkUnit() {
WorkUnit workUnit = null;
synchronized (activeWorkUnits) {
cleanWorkUnits(null);
try {
String[] filenames = new File(".").list();
if (filenames != null) {
WorkUnit workUnitCandidate = (WorkUnit)task.getWorkUnitClass().newInstance();
for (int i = 0; i < filenames.length; ++i) {
if (workUnitCandidate.init(filenames[i]) && !workUnitCandidate.isCompleted()) {
if ((workUnit == null || workUnitCandidate.compare(workUnitCandidate, workUnit) < 0) && (offline || possibleWorkUnits.contains(workUnitCandidate)) && !activeWorkUnits.contains(workUnitCandidate)) {
workUnit = (WorkUnit)workUnitCandidate.clone();
}
}
}
if (workUnit != null) {
activeWorkUnits.add(workUnit);
}
}
} catch (Exception e) {
ZetaInfo.handle(e);
return null;
}
}
return workUnit;
}
public void submit(final WorkUnit workUnit, final boolean exitAfterTransfer, final boolean outputExceptions) {
synchronized (activeWorkUnits) {
if (workUnit != null && !activeWorkUnits.contains(workUnit) || storeWorkUnits.contains(workUnit)) {
return;
}
storeWorkUnits.add(workUnit);
}
Thread t = new Thread() {
public void run() {
try {
if (workUnit == null) {
int storedWorkUnits = storeCompletedWorkUnits(outputExceptions);
if (storedWorkUnits > 0) {
if (exitAfterTransfer) {
System.exit(1);
}
if (exitAfterWorkUnitsActive) {
exitAfterWorkUnits -= storedWorkUnits;
if (exitAfterWorkUnits <= numberOfCompletedLocalWorkUnits()-completedLocalWorkUnits-getNumberOfActiveWorkUnits()) {
System.exit(1);
}
}
}
if ((outputExceptions || storedWorkUnits > 0) && !requestNewWorkUnits()) {
ZetaInfo.write(" ");
try {
Thread.sleep(wait);
} catch (InterruptedException ex) {
}
wait = (wait == 60000)? 900000 : 60000;
} else {
ZetaInfo.write(" ");
}
} else {
wait = 60000;
if (storeData(workUnit, outputExceptions)) {
if (exitAfterTransfer || exitAfterWorkUnitsActive && --exitAfterWorkUnits <= numberOfCompletedLocalWorkUnits()-completedLocalWorkUnits-getNumberOfActiveWorkUnits()) {
System.exit(1);
}
requestNewWorkUnits();
ZetaInfo.write(" ");
}
}
} finally {
synchronized (activeWorkUnits) {
if (workUnit != null) {
activeWorkUnits.remove(workUnit);
}
storeWorkUnits.remove(workUnit);
}
}
}
};
if (properties.get("transfer.asynchronous", "false").equals("true")) {
t.start();
} else {
t.run();
}
}
private boolean storeData(WorkUnit workUnit, boolean outputExceptions) {
synchronized (WorkUnitManager.class) {
if (!workUnit.isCompleted()) {
return false;
}
boolean error = true;
for (int tries = 0; error && tries < 2; ++tries) {
error = false;
SecureRandom rnd = new SecureRandom();
rnd.setSeed(System.currentTimeMillis());
String dataFilename = workUnit.getFilename();
String logFilename = workUnit.getLogFilename();
File dataFile = new File(dataFilename + ".$$$");
File logFile = new File(logFilename + ".$$$");
HttpURLConnection connection = null;
OutputStream out = null;
try {
String hostname = properties.get("host.name", "www.zetagrid.net");
InetAddress localHost = InetAddress.getLocalHost();
String paramString = "task=" + URLEncoder.encode(task.getTaskname())
+ "&work_unit_id=" + workUnit.getWorkUnitId()
+ "&user=" + URLEncoder.encode(properties.get("name"))
+ "&hostname=" + URLEncoder.encode(localHost.getHostName().toLowerCase())
+ "&hostaddr=" + URLEncoder.encode(localHost.getHostAddress())
+ "&key=" + URLEncoder.encode(getKey());
try {
paramString = ZetaClient.encryptURLFile(paramString);
} catch (Throwable t) { }
ZetaInfo.write("Checking connection to server " + hostname);
URL url = new URL("http", hostname, properties.get("host.port", 80), properties.get("resultURL", "/servlet/service/result"));
connection = (HttpURLConnection)url.openConnection();
connection.setUseCaches(false);
connection.setRequestProperty("Content-Length", "2");
connection.setRequestProperty("Content-Type", "application/octet-stream");
connection.setRequestProperty("Param-String", paramString); connection.setDoOutput(true);
connection.setRequestMethod("POST");
connection.connect();
out = connection.getOutputStream();
ZetaInfo.write("Checking availability of the services at " + hostname);
out.write(new byte[] { 1, 2 });
out.flush();
int code = connection.getResponseCode();
if (code == HttpURLConnection.HTTP_UNAVAILABLE) {
error = true;
continue;
}
if (code != HttpURLConnection.HTTP_ACCEPTED && code != HttpURLConnection.HTTP_NOT_ACCEPTABLE) {
ZetaInfo.write("The services at " + hostname + " are not available!");
return false;
}
byte[] keyEncryptorClass = null;
String keySignature = null;
ByteArrayOutputStream resultOut = new ByteArrayOutputStream(4 * 1024 * 1024);
if (code == HttpURLConnection.HTTP_ACCEPTED) {
try {
StreamUtils.writeData(connection.getInputStream(), resultOut, false, true);
ZipInputStream zip = new ZipInputStream(new ByteArrayInputStream(Base64.decode(resultOut.toString("UTF-8"))));
while (true) {
ZipEntry entry = zip.getNextEntry();
if (entry == null) {
break;
}
String name = entry.getName();
if (name.equals("className")) {
resultOut.reset();
StreamUtils.writeData(zip, resultOut, false, true);
keyEncryptorClass = resultOut.toByteArray();
} else if (name.equals("signature")) {
resultOut.reset();
StreamUtils.writeData(zip, resultOut, false, true);
keySignature = resultOut.toString("UTF-8");
} else {
throw new IOException("The signature contains a not valid value '" + name + '\'');
}
}
} catch (IOException ioe) {
ZetaInfo.handle(ioe);
keyEncryptorClass = null;
keySignature = null;
}
}
connection.disconnect();
connection = null;
if (keyEncryptorClass == null || keyEncryptorClass.length == 0) {
ZetaInfo.write("No encryption key is available at " + hostname);
return false;
}
if (keySignature == null || keySignature.length() == 0) {
ZetaInfo.write("No digital signature is available for the encryption key at " + hostname);
return false;
}
ZetaInfo.write("Check digital signature of the encryption class");
FileInputStream fin = null;
try {
if (!ZetaClient.verify(null, keySignature, keyEncryptorClass)) {
throw new IOException("Wrong signature for the encryption class");
}
if (!StreamUtils.checkAvailDiskSpace(new FileInputStream(dataFilename), dataFile)) {
throw new IOException("Hard disk is full!");
}
resultOut.reset();
ZetaInfo.write("Encrypting work unit " + workUnit.getWorkUnitId());
ZetaClient.encrypt(rnd.nextInt(1024)+5, keyEncryptorClass, dataFilename, dataFilename + ".$$$");
ZetaClient.encrypt(rnd.nextInt(1024)+5, keyEncryptorClass, logFilename, logFilename + ".$$$");
} catch (Error err) {
ZetaInfo.write("The digital signature cannot be verified. Please update your client!");
if (!checkAvailDiskSpace(dataFilename, dataFile)) {
throw new IOException("Hard disk is full!");
}
resultOut.reset();
ZetaInfo.write("Encrypting work unit " + workUnit.getWorkUnitId());
ZetaClient.encrypt(rnd.nextInt(1024)+5, dataFilename, dataFilename + ".$$$");
ZetaClient.encrypt(rnd.nextInt(1024)+5, logFilename, logFilename + ".$$$");
} finally {
StreamUtils.close(fin);
}
FileOutputStream fileOut = null;
try {
ZipOutputStream zip = new ZipOutputStream(resultOut);
zip.setLevel(Deflater.BEST_COMPRESSION);
zip.putNextEntry(new ZipEntry(dataFile.getName()));
StreamUtils.writeData(new FileInputStream(dataFile), zip, true, false);
zip.putNextEntry(new ZipEntry(logFile.getName()));
StreamUtils.writeData(new FileInputStream(logFile), zip, true, false);
zip.close();
} finally {
resultOut.close();
}
dataFile.delete();
logFile.delete();
ZetaInfo.write("Connecting server " + hostname);
MonitoredOutputStream.setWorkUnitId(workUnit.getWorkUnitId());
url = new URL("http", hostname, properties.get("host.port", 80), properties.get("resultURL", "/servlet/service/result"), new MonitoredURLStreamHandler());
connection = (HttpURLConnection)url.openConnection();
connection.setUseCaches(false);
byte[] resultBuffer = resultOut.toByteArray();
connection.setRequestProperty("Content-Length", Long.toString(resultBuffer.length));
connection.setRequestProperty("Content-Type", "application/octet-stream");
connection.setRequestProperty("Param-String", paramString); connection.setDoOutput(true);
connection.setRequestMethod("POST");
connection.connect();
out = connection.getOutputStream();
ZetaInfo.write("Server " + hostname + " connected");
out.write(resultBuffer);
out.flush();
ZetaInfo.write("Transfering work unit " + workUnit.getWorkUnitId());
resultBuffer = null;
code = connection.getResponseCode();
if (code != HttpURLConnection.HTTP_OK) {
ZetaInfo.write("Could not store result: return code " + code);
System.err.println("Could not store result: return code " + code);
error = true;
if (code == HttpURLConnection.HTTP_INTERNAL_ERROR || code == HttpURLConnection.HTTP_UNAVAILABLE) {
continue;
}
return false;
}
ZetaInfo.write("Work unit " + workUnit.getWorkUnitId() + " successfully transferred");
dataFile = new File(dataFilename);
logFile = new File(logFilename);
} catch (UnknownHostException uhe) {
error = true;
tries = 1;
if (outputExceptions) {
ZetaInfo.handle(uhe);
}
} catch (IOException e) {
error = true;
try {
if (connection == null || connection.getResponseCode() != HttpURLConnection.HTTP_INTERNAL_ERROR) {
tries = 1;
}
} catch (IOException ioe) {
}
if (tries == 1 && outputExceptions) {
ZetaInfo.handle(e);
}
try {
Thread.sleep(4000);
} catch (InterruptedException ex) {
}
} finally {
StreamUtils.close(out);
if (connection != null) {
connection.disconnect();
connection = null;
}
dataFile.delete();
logFile.delete();
cleanWorkUnits(null);
System.gc();
}
}
return !error;
}
}
private int storeCompletedWorkUnits(boolean outputExceptions) {
synchronized (WorkUnitManager.class) {
int storedWorkUnits = 0;
try {
String[] filenames = new File(".").list();
if (filenames != null) {
WorkUnit workUnit = (WorkUnit)task.getWorkUnitClass().newInstance();
for (int i = 0; i < filenames.length; ++i) {
if (workUnit.init(filenames[i]) && workUnit.isCompleted()) {
File file = new File(workUnit.getFilename());
if (file.exists() && workUnit.isValid() && storeData(workUnit, outputExceptions) && ++storedWorkUnits == 0) {
storedWorkUnits = 1;
}
}
}
}
} catch (Exception e) {
if (outputExceptions) {
ZetaInfo.handle(e);
}
if (storedWorkUnits == 0) {
storedWorkUnits = -1;
}
}
return storedWorkUnits;
}
}
private boolean requestNewWorkUnits() {
synchronized (WorkUnitManager.class) {
if (properties.get("processors", 1) <= 0) {
return false;
}
int workUnits = properties.get("work_units", 1);
if (workUnits <= numberOfLocalWorkUnits(2*24*60*60*1000)) { retrievePossibleWorkUnits();
return true;
}
String workUnitSize = properties.get("work_unit_size", "m");
String eMail = properties.get("eMail");
if (eMail == null) {
eMail = "";
}
String messages = properties.get("messages", null);
if (messages == null) {
messages = "";
} else {
messages = (messages.equals("true"))? "&messages=true" : "&messages=false";
}
String team = properties.get("team");
if (team == null) {
team = "";
}
String version = task.getVersion();
for (int tries = 0; tries < 2; ++tries) {
HttpURLConnection connection = null;
try {
InetAddress localHost = InetAddress.getLocalHost();
ZetaInfo.write("Requesting new work units");
String urlFile = properties.get("requestURL", "/servlet/service/requestWorkUnit")
+ "?user=" + URLEncoder.encode(properties.get("name"))
+ "&email=" + URLEncoder.encode(eMail)
+ messages
+ "&team=" + URLEncoder.encode(team)
+ "&task=" + URLEncoder.encode(task.getTaskname())
+ "&size=" + URLEncoder.encode(workUnitSize)
+ "&hostname=" + URLEncoder.encode(localHost.getHostName().toLowerCase())
+ "&hostaddr=" + URLEncoder.encode(localHost.getHostAddress())
+ "&key=" + URLEncoder.encode(getKey())
+ "&version=" + URLEncoder.encode(version)
+ "&work_units=" + workUnits
+ "&os_name=" + URLEncoder.encode(System.getProperty("os.name", "?"))
+ "&os_version=" + URLEncoder.encode(System.getProperty("os.version", "?"))
+ "&os_arch=" + URLEncoder.encode(System.getProperty("os.arch", "?"))
+ "&processors=" + properties.get("processors", 1);
try {
urlFile = ZetaClient.encryptURLFile(urlFile);
} catch (Throwable t) { }
URL url = new URL("http", properties.get("host.name", "www.zetagrid.net"), properties.get("host.port", 80), urlFile);
connection = (HttpURLConnection)url.openConnection();
connection.setUseCaches(false);
connection.setRequestMethod("GET");
connection.setDoInput(true);
connection.connect();
storePossibleWorkUnits(new BufferedReader(new InputStreamReader(connection.getInputStream())));
return true;
} catch (UnknownHostException uhe) {
tries = 1;
ZetaInfo.handle(uhe);
} catch (MalformedURLException e) {
try {
if (connection == null || connection.getResponseCode() != HttpURLConnection.HTTP_INTERNAL_ERROR) {
tries = 1;
}
} catch (IOException ioe) {
}
if (tries == 1) {
ZetaInfo.handle(e);
}
try {
Thread.sleep(4000);
} catch (InterruptedException ex) {
}
} catch (IOException e) {
try {
if (connection == null || connection.getResponseCode() != HttpURLConnection.HTTP_INTERNAL_ERROR) {
tries = 1;
}
} catch (IOException ioe) {
}
if (tries == 1) {
ZetaInfo.handle(e);
}
try {
Thread.sleep(4000);
} catch (InterruptedException ex) {
}
} catch (Exception e) {
ZetaInfo.handle(e);
return false;
} finally {
if (connection != null) {
connection.disconnect();
connection = null;
}
}
}
return false;
}
}
private int getNumberOfActiveWorkUnits() {
synchronized (activeWorkUnits) {
return activeWorkUnits.size();
}
}
private int numberOfLocalWorkUnits(long notOlderThanMillis) {
int count = 0;
synchronized (activeWorkUnits) {
try {
File[] files = new File(".").listFiles();
if (files != null) {
WorkUnit workUnit = (WorkUnit)task.getWorkUnitClass().newInstance();
long currentTime = System.currentTimeMillis();
for (int i = 0; i < files.length; ++i) {
if (workUnit.init(files[i].getName()) && currentTime-files[i].lastModified() < notOlderThanMillis && (offline || possibleWorkUnits.contains(workUnit))) {
++count;
}
}
}
} catch (Exception e) {
ZetaInfo.handle(e);
count = 0;
}
}
return count;
}
private int numberOfCompletedLocalWorkUnits() {
int count = 0;
synchronized (activeWorkUnits) {
try {
String[] filenames = new File(".").list();
if (filenames != null) {
WorkUnit workUnit = (WorkUnit)task.getWorkUnitClass().newInstance();
for (int i = 0; i < filenames.length; ++i) {
if (workUnit.init(filenames[i]) && workUnit.isCompleted() && (offline || possibleWorkUnits.contains(workUnit))) {
++count;
}
}
}
} catch (Exception e) {
ZetaInfo.handle(e);
count = 0;
}
}
return count;
}
private void cleanWorkUnits(List workUnitFiles) {
try {
WorkUnit workUnit = (WorkUnit)task.getWorkUnitClass().newInstance();
File[] files = new File(".").listFiles();
if (files != null) {
if (workUnitFiles != null) {
long currentTime = System.currentTimeMillis();
for (int i = 0; i < files.length; ++i) {
if (workUnit.isLogFilename(files[i].getName()) && !workUnitFiles.contains(files[i].getName()) && currentTime-files[i].lastModified() > 24*60*60*1000) {
files[i].delete();
}
}
}
for (int i = 0; i < files.length; ++i) {
String filename = files[i].getName();
if (workUnit.init(filename) && workUnit.isCompleted()) {
File file = new File(workUnit.getFilename());
if (!file.exists()) { files[i].delete();
}
} else if (workUnit.isFilename(filename)) {
File file = new File(workUnit.getLogFilename(filename));
if (!file.exists()) { files[i].delete();
}
}
}
}
} catch (Exception e) {
ZetaInfo.handle(e);
}
}
private boolean checkAvailDiskSpace(String src, File dest) throws IOException {
ZetaInfo.write("Check available disk space");
FileInputStream fin = null;
FileOutputStream fout = null;
File file = new File(src);
boolean result = false;
try {
fin = new FileInputStream(src);
fout = new FileOutputStream(dest);
StreamUtils.writeData(fin, fout, false, false);
} finally {
StreamUtils.close(fin);
StreamUtils.close(fout);
result = (file.length() == dest.length());
dest.delete();
}
return result;
}
private void retrievePossibleWorkUnits() {
offline = true;
synchronized (activeWorkUnits) {
possibleWorkUnits.clear();
BufferedReader reader = null;
try {
WorkUnit workUnit = (WorkUnit)task.getWorkUnitClass().newInstance();
reader = new BufferedReader(new FileReader(InetAddress.getLocalHost().getHostName() + ".tmp"));
while (true) {
String line = reader.readLine();
if (line == null) {
break;
}
if (workUnit.init(line) && workUnit.isValid()) {
possibleWorkUnits.add(workUnit.clone());
}
}
} catch (FileNotFoundException e) {
} catch (Exception e) {
ZetaInfo.handle(e);
} finally {
StreamUtils.close(reader);
offline = possibleWorkUnits.isEmpty();
}
}
}
private void storePossibleWorkUnits(BufferedReader workUnits) {
synchronized (activeWorkUnits) {
BufferedWriter writer = null;
List workUnitFiles = null;
try {
WorkUnit workUnit = (WorkUnit)task.getWorkUnitClass().newInstance();
workUnitFiles = workUnit.parseWorkUnitFiles(workUnits);
possibleWorkUnits.clear();
final int l = workUnitFiles.size();
for (int i = 0; i < l; ++i) {
if (workUnit.init((String)workUnitFiles.get(i)) && workUnit.isValid()) {
possibleWorkUnits.add(workUnit.clone());
}
}
offline = false;
writer = new BufferedWriter(new FileWriter(InetAddress.getLocalHost().getHostName() + ".tmp"));
Iterator i = possibleWorkUnits.iterator();
while (i.hasNext()) {
writer.write(((WorkUnit)i.next()).getLogFilename());
writer.newLine();
}
} catch (Exception ioe) {
ZetaInfo.handle(ioe);
} finally {
StreamUtils.close(writer);
if (workUnitFiles != null) {
cleanWorkUnits(workUnitFiles);
}
}
}
}
private String getKey() {
String key = "";
try {
key = ZetaClient.getKey();
} catch (Throwable t) { try {
key = properties.get("name") + properties.get("eMail") + InetAddress.getLocalHost().getHostName();
key = Base64.encode(key.toLowerCase().getBytes("UTF-8"));
} catch (Exception e) {
}
}
return key;
}
private ZetaProperties properties = new ZetaProperties();
private Task task;
private Set possibleWorkUnits;
private Set activeWorkUnits;
private Set storeWorkUnits;
private boolean offline = false;
private int completedLocalWorkUnits;
private int exitAfterWorkUnits = 0;
private boolean exitAfterWorkUnitsActive = false;
private int wait = 60000;
}