package proai.cache;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import net.sf.bvalid.Validator;
import org.apache.log4j.Logger;
import proai.MetadataFormat;
import proai.Record;
import proai.SetInfo;
import proai.driver.OAIDriver;
import proai.driver.RemoteIterator;
import proai.error.ImmediateShutdownException;
import proai.error.RepositoryException;
import proai.error.ServerException;
import proai.util.SetSpec;

/* loaded from: input_file:proai/cache/Updater.class */
public class Updater extends Thread {
    private static Logger _LOG = Logger.getLogger(Updater.class.getName());
    private int _pollSeconds;
    private int _maxWorkers;
    private int _maxWorkBatchSize;
    private int _maxFailedRetries;
    private int _maxCommitQueueSize;
    private int _maxRecordsPerTransaction;
    private OAIDriver _driver;
    private RCDatabase _db;
    private RCDisk _disk;
    private Validator _validator;
    private boolean _shutdownRequested;
    private boolean _immediateShutdownRequested;
    private QueueIterator _queueIterator;
    private Worker[] _workers;
    private Committer _committer;
    private boolean _processingAborted;
    private String _status;

    public Updater(OAIDriver oAIDriver, RecordCache recordCache, RCDatabase rCDatabase, RCDisk rCDisk, int i, int i2, int i3, int i4, int i5, int i6, Validator validator) {
        this._driver = oAIDriver;
        this._db = rCDatabase;
        this._disk = rCDisk;
        this._pollSeconds = i;
        this._maxWorkers = i2;
        this._maxWorkBatchSize = i3;
        this._maxFailedRetries = i4;
        this._maxCommitQueueSize = i5;
        this._maxRecordsPerTransaction = i6;
        this._validator = validator;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        this._status = "Started";
        while (!this._shutdownRequested) {
            long currentTimeMillis = System.currentTimeMillis();
            _LOG.info("Update cycle initiated");
            try {
                this._status = "Processing any old items in queue";
                checkImmediateShutdown();
                processQueue("old");
                checkImmediateShutdown();
                this._status = "Polling and updating queue and database";
                pollAndUpdate();
                this._status = "Processing any new items in queue";
                checkImmediateShutdown();
                processQueue("new");
                checkImmediateShutdown();
                this._status = "Pruning old files from cache if needed";
                pruneIfNeeded();
                _LOG.info("Update cycle finished in " + ((System.currentTimeMillis() - currentTimeMillis) / 1000) + "sec.Next cycle scheduled in " + this._pollSeconds + "sec.");
            } catch (ImmediateShutdownException e) {
                _LOG.info("Update cycle aborted due to immediate shutdown request");
            } catch (Throwable th) {
                _LOG.error("Update cycle failed", th);
            }
            this._status = "Sleeping";
            for (int i = 0; !this._shutdownRequested && i < this._pollSeconds; i++) {
                try {
                    Thread.sleep(1000L);
                } catch (Exception e2) {
                }
            }
        }
        this._status = "Finished";
    }

    private void pruneIfNeeded() throws Exception {
        File file = null;
        PrintWriter printWriter = null;
        BufferedReader bufferedReader = null;
        try {
            Connection connection = RecordCache.getConnection();
            if (this._db.getPrunableCount(connection) > 0) {
                file = File.createTempFile("proai-prunable", ".txt");
                printWriter = new PrintWriter(new OutputStreamWriter(new FileOutputStream(file), "UTF-8"));
                int dumpPrunables = this._db.dumpPrunables(connection, printWriter);
                printWriter.close();
                _LOG.info("Pruning " + dumpPrunables + " old files from cache");
                bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(file), "UTF-8"));
                int i = 0;
                int[] iArr = new int[32];
                for (String readLine = bufferedReader.readLine(); readLine != null; readLine = bufferedReader.readLine()) {
                    String[] split = readLine.split(" ");
                    if (split.length == 2) {
                        int parseInt = Integer.parseInt(split[0]);
                        File file2 = this._disk.getFile(split[1]);
                        if (!file2.exists()) {
                            _LOG.debug("No need to delete non-existing old cache file: " + split[1]);
                        } else if (file2.delete()) {
                            _LOG.debug("Deleted old cache file: " + split[1]);
                        } else {
                            _LOG.warn("Unable to delete old cache file (will try again later): " + split[1]);
                        }
                        int i2 = i;
                        i++;
                        iArr[i2] = parseInt;
                        if (i == iArr.length) {
                            this._db.deletePrunables(connection, iArr, i);
                            i = 0;
                        }
                    }
                }
                if (i > 0) {
                    this._db.deletePrunables(connection, iArr, i);
                }
            } else {
                _LOG.info("Pruning is not needed.");
            }
            if (printWriter != null) {
                try {
                    printWriter.close();
                } catch (Exception e) {
                }
                if (bufferedReader != null) {
                    try {
                        bufferedReader.close();
                    } catch (Exception e2) {
                    }
                }
            }
            if (file != null) {
                file.delete();
            }
            RecordCache.releaseConnection(connection);
        } catch (Throwable th) {
            if (0 != 0) {
                try {
                    printWriter.close();
                } catch (Exception e3) {
                }
                if (0 != 0) {
                    try {
                        bufferedReader.close();
                    } catch (Exception e4) {
                    }
                }
            }
            if (0 != 0) {
                file.delete();
            }
            RecordCache.releaseConnection(null);
            throw th;
        }
    }

    private void checkImmediateShutdown() throws ImmediateShutdownException {
        if (this._immediateShutdownRequested) {
            throw new ImmediateShutdownException();
        }
    }

    public void shutdown(boolean z) {
        if (isAlive()) {
            this._shutdownRequested = true;
            this._immediateShutdownRequested = z;
            while (isAlive()) {
                _LOG.info("Waiting for updater to finish.  Current status: " + this._status);
                try {
                    Thread.sleep(250L);
                } catch (Exception e) {
                }
            }
            _LOG.info("Updater shutdown complete");
        }
    }

    private void pollAndUpdate() throws ServerException {
        ServerException serverException;
        Connection connection = null;
        boolean z = false;
        try {
            try {
                connection = RecordCache.getConnection();
                connection.setAutoCommit(false);
                z = true;
                this._db.queueFailedRecords(connection, this._maxFailedRetries);
                if (this._db.isPollingEnabled(connection)) {
                    long time = this._driver.getLatestDate().getTime();
                    if (time > this._db.getEarliestPollDate(connection)) {
                        _LOG.info("Starting update process; source data of interest may have changed.");
                        checkImmediateShutdown();
                        updateIdentify(connection);
                        checkImmediateShutdown();
                        List<String> updateFormats = updateFormats(connection);
                        checkImmediateShutdown();
                        updateSets(connection);
                        checkImmediateShutdown();
                        queueUpdatedRecords(connection, updateFormats, time);
                    } else {
                        _LOG.info("Skipping update process; source data of interest has not changed");
                    }
                } else {
                    _LOG.info("Remote polling skipped -- polling is disabled");
                }
                connection.commit();
                try {
                    if (connection != null) {
                        if (1 != 0) {
                            try {
                                connection.setAutoCommit(false);
                            } catch (SQLException e) {
                                _LOG.error("Failed to set autoCommit to false", e);
                                RecordCache.releaseConnection(connection);
                                return;
                            }
                        }
                        RecordCache.releaseConnection(connection);
                    }
                } catch (Throwable th) {
                    RecordCache.releaseConnection(connection);
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th2) {
            try {
                if (connection != null) {
                    if (z) {
                        try {
                            connection.setAutoCommit(false);
                        } catch (SQLException e2) {
                            _LOG.error("Failed to set autoCommit to false", e2);
                            RecordCache.releaseConnection(connection);
                            throw th2;
                        }
                    }
                    RecordCache.releaseConnection(connection);
                }
                throw th2;
            } catch (Throwable th3) {
                RecordCache.releaseConnection(connection);
                throw th3;
            }
        }
    }

    private void updateIdentify(Connection connection) throws Exception {
        _LOG.info("Getting 'Identify' xml from remote source...");
        this._db.setIdentifyPath(connection, this._disk.write(this._driver));
    }

    private List<String> updateFormats(Connection connection) throws Exception {
        _LOG.info("Updating metadata formats...");
        RemoteIterator<? extends MetadataFormat> listMetadataFormats = this._driver.listMetadataFormats();
        ArrayList arrayList = new ArrayList();
        while (listMetadataFormats.hasNext()) {
            try {
                checkImmediateShutdown();
                MetadataFormat next = listMetadataFormats.next();
                this._db.putFormat(connection, next);
                arrayList.add(next.getPrefix());
            } finally {
                try {
                    listMetadataFormats.close();
                } catch (Exception e) {
                    _LOG.warn("Unable to close remote metadata format iterator", e);
                }
            }
        }
        Iterator<CachedMetadataFormat> it = this._db.getFormats(connection).iterator();
        while (it.hasNext()) {
            String prefix = it.next().getPrefix();
            if (!arrayList.contains(prefix)) {
                checkImmediateShutdown();
                this._db.deleteFormat(connection, prefix);
            }
        }
        return arrayList;
    }

    private void updateSets(Connection connection) throws Exception {
        _LOG.info("Updating sets...");
        RemoteIterator<? extends SetInfo> listSetInfo = this._driver.listSetInfo();
        HashSet hashSet = new HashSet();
        HashSet<String> hashSet2 = new HashSet();
        while (listSetInfo.hasNext()) {
            try {
                checkImmediateShutdown();
                SetInfo next = listSetInfo.next();
                String setSpec = next.getSetSpec();
                if (SetSpec.hasParents(setSpec) && !hashSet.contains(SetSpec.parentOf(setSpec))) {
                    hashSet2.add(SetSpec.parentOf(setSpec));
                }
                this._db.putSetInfo(connection, setSpec, this._disk.write(next));
                hashSet.add(setSpec);
            } finally {
                try {
                    listSetInfo.close();
                } catch (Exception e) {
                    _LOG.warn("Unable to close remote set info iterator", e);
                }
            }
        }
        for (String str : hashSet2) {
            if (!SetSpec.isValid(str)) {
                throw new RepositoryException("SetSpec '" + str + "' is malformed");
            }
            for (String str2 : SetSpec.allSetsFor(str)) {
                if (!hashSet.contains(str2)) {
                    this._db.putSetInfo(connection, str2, this._disk.write(SetSpec.defaultInfoFor(str2)));
                    hashSet.add(str2);
                    _LOG.warn("Adding missing set: " + str2);
                }
            }
        }
        Iterator<SetInfo> it = this._db.getSetInfo(connection).iterator();
        while (it.hasNext()) {
            String setSpec2 = it.next().getSetSpec();
            if (!hashSet.contains(setSpec2)) {
                checkImmediateShutdown();
                this._db.deleteSet(connection, setSpec2);
            }
        }
    }

    private void queueUpdatedRecords(Connection connection, List<String> list, long j) throws Exception {
        _LOG.info("Querying and queueing updated records...");
        long currentTimeMillis = System.currentTimeMillis();
        int i = 0;
        for (String str : list) {
            long lastPollDate = this._db.getLastPollDate(connection, str);
            if (lastPollDate < j) {
                _LOG.info("Querying for changed " + str + " records because " + lastPollDate + " is less than " + j);
                checkImmediateShutdown();
                RemoteIterator<? extends Record> listRecords = this._driver.listRecords(new Date(lastPollDate), new Date(j), str);
                int i2 = 0;
                while (listRecords.hasNext()) {
                    try {
                        Record next = listRecords.next();
                        checkImmediateShutdown();
                        this._db.queueRemoteRecord(connection, next.getItemID(), next.getPrefix(), next.getSourceInfo());
                        i2++;
                    } catch (Throwable th) {
                        try {
                            listRecords.close();
                        } catch (Exception e) {
                            _LOG.warn("Unable to close remote record iterator", e);
                        }
                        throw th;
                    }
                }
                _LOG.info("Queued " + i2 + " new/modified " + str + " records.");
                this._db.setLastPollDate(connection, str, j);
                i += i2;
                try {
                    listRecords.close();
                } catch (Exception e2) {
                    _LOG.warn("Unable to close remote record iterator", e2);
                }
            } else {
                _LOG.info("Skipping " + str + " records because " + lastPollDate + " is not less than " + j);
            }
        }
        _LOG.info("Queued " + i + " total new/modified records in " + ((System.currentTimeMillis() - currentTimeMillis) / 1000) + "sec.");
    }

    private int countItemsInQueue() throws Exception {
        Connection connection = RecordCache.getConnection();
        try {
            int queueSize = this._db.getQueueSize(connection);
            RecordCache.releaseConnection(connection);
            return queueSize;
        } catch (Throwable th) {
            RecordCache.releaseConnection(connection);
            throw th;
        }
    }

    private QueueIterator newQueueIterator() throws Exception {
        Connection connection = null;
        File file = null;
        PrintWriter printWriter = null;
        try {
            connection = RecordCache.getConnection();
            file = File.createTempFile("proai-queue", ".txt");
            printWriter = new PrintWriter(new OutputStreamWriter(new FileOutputStream(file), "UTF-8"));
            this._db.dumpQueue(connection, printWriter);
            printWriter.close();
            QueueIterator queueIterator = new QueueIterator(file);
            if (printWriter != null) {
                try {
                    printWriter.close();
                } catch (Exception e) {
                }
            }
            if (file != null) {
                file.delete();
            }
            RecordCache.releaseConnection(connection);
            return queueIterator;
        } catch (Throwable th) {
            if (printWriter != null) {
                try {
                    printWriter.close();
                } catch (Exception e2) {
                }
            }
            if (file != null) {
                file.delete();
            }
            RecordCache.releaseConnection(connection);
            throw th;
        }
    }

    private void processQueue(String str) throws Exception {
        _LOG.info("Processing " + str + " records in queue...");
        int countItemsInQueue = countItemsInQueue();
        checkImmediateShutdown();
        if (countItemsInQueue <= 0) {
            _LOG.info("Queue is empty.  No processing needed.");
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        this._processingAborted = false;
        while (countItemsInQueue > 0 && !this._processingAborted) {
            try {
                this._queueIterator = newQueueIterator();
                this._committer = new Committer(this, this._db, this._maxCommitQueueSize, this._maxRecordsPerTransaction);
                int i = countItemsInQueue / this._maxWorkBatchSize;
                if (i > this._maxWorkers) {
                    i = this._maxWorkers;
                }
                if (i == 0) {
                    i = 1;
                }
                _LOG.info("Queue has " + countItemsInQueue + " records.  Starting " + i + " worker threads for processing.");
                this._workers = new Worker[i];
                for (int i2 = 0; i2 < this._workers.length; i2++) {
                    this._workers[i2] = new Worker(i2 + 1, this._workers.length, this, this._driver, this._disk, this._validator);
                    this._workers[i2].start();
                }
                this._committer.start();
                while (this._committer.isAlive()) {
                    try {
                        Thread.sleep(1000L);
                    } catch (Exception e) {
                    }
                }
                checkImmediateShutdown();
                if (this._queueIterator != null) {
                    this._queueIterator.close();
                }
                if (this._workers != null) {
                    logProcessingStats(countItemsInQueue, System.currentTimeMillis() - currentTimeMillis);
                    this._workers = null;
                    this._committer = null;
                }
                countItemsInQueue = countItemsInQueue();
            } catch (Throwable th) {
                if (this._queueIterator != null) {
                    this._queueIterator.close();
                }
                if (this._workers != null) {
                    logProcessingStats(countItemsInQueue, System.currentTimeMillis() - currentTimeMillis);
                    this._workers = null;
                    this._committer = null;
                }
                throw th;
            }
        }
        if (this._processingAborted) {
            throw new ServerException("Queue processing was aborted due to unexpected error (see above)");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleCommitException(Throwable th) {
        _LOG.warn("Processing aborted due to commit failure", th);
        this._processingAborted = true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<QueueItem> getNextBatch(List<QueueItem> list) {
        ArrayList arrayList = null;
        if (!processingShouldStop()) {
            if (list != null) {
                this._committer.handoff(list);
            }
            try {
                synchronized (this._queueIterator) {
                    if (this._queueIterator.hasNext()) {
                        arrayList = new ArrayList();
                        while (this._queueIterator.hasNext() && arrayList.size() < this._maxWorkBatchSize) {
                            arrayList.add(this._queueIterator.next());
                        }
                    }
                }
            } catch (Throwable th) {
                _LOG.warn("Processing aborted due to commit failure", th);
                synchronized (this) {
                    this._processingAborted = true;
                    arrayList = null;
                }
            }
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized boolean processingShouldStop() {
        return this._processingAborted || this._immediateShutdownRequested;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean anyWorkersAreRunning() {
        if (this._workers == null) {
            return false;
        }
        for (int i = 0; i < this._workers.length; i++) {
            if (this._workers[i].isAlive()) {
                return true;
            }
        }
        return false;
    }

    private void logProcessingStats(int i, long j) {
        StringBuffer stringBuffer = new StringBuffer();
        int processedCount = this._committer.getProcessedCount();
        stringBuffer.append("    Records processed        : " + processedCount + " of " + i + " on queue\n");
        stringBuffer.append("    Total processing time    : " + getHMSString(j) + "\n");
        stringBuffer.append("    Processing rate          : " + round(processedCount / (j / 1000.0d)) + " records/second\n");
        stringBuffer.append("    Workers spawned          : " + this._workers.length + " of " + this._maxWorkers + " maximum\n");
        int i2 = 0;
        int i3 = 0;
        long j2 = 0;
        for (int i4 = 0; i4 < this._workers.length; i4++) {
            i2 += this._workers[i4].getFailedCount();
            i3 += this._workers[i4].getAttemptedCount();
            j2 += this._workers[i4].getTotalFetchTime();
        }
        stringBuffer.append("    Failed record loads      : " + i2 + " of " + i3 + " attempted\n");
        stringBuffer.append("    Avg roundtrip fetch time : " + getHMSString(j2 / i3) + "\n");
        int transactionCount = this._committer.getTransactionCount();
        stringBuffer.append("    Total DB transactions    : " + transactionCount + "\n");
        stringBuffer.append("    Total transaction time   : " + getHMSString(this._committer.getTotalCommitTime()) + "\n");
        stringBuffer.append("    Avg time/transaction     : " + getHMSString(Math.round(this._committer.getTotalCommitTime() / transactionCount)) + "\n");
        stringBuffer.append("    Avg recs/transaction     : " + round(processedCount / transactionCount) + " of " + this._maxRecordsPerTransaction + " maximum\n");
        _LOG.info("A round of queue processing has finished.\n\nProcessing Stats:\n" + stringBuffer.toString());
    }

    private static double round(double d) {
        return Math.round(d * 100.0d) / 100.0d;
    }

    private static String getHMSString(long j) {
        StringBuffer stringBuffer = new StringBuffer();
        long j2 = j / 3600000;
        long j3 = j - (((j2 * 1000) * 60) * 60);
        long j4 = j3 / 60000;
        long j5 = j3 - ((j4 * 1000) * 60);
        long j6 = j5 / 1000;
        long j7 = j5 - (j6 * 1000);
        if (j2 > 0) {
            stringBuffer.append(j2 + " hours, ");
        }
        if (j4 > 0) {
            stringBuffer.append(j4 + " minutes, ");
        }
        stringBuffer.append(j6 + (j7 > 99 ? "." + j7 : j7 > 9 ? ".0" + j7 : j7 > 0 ? ".00" + j7 : ".000") + " seconds");
        return stringBuffer.toString();
    }
}
