cache service: move database updating to happen during download
This commit is contained in:
@@ -264,6 +264,11 @@ public class CacheClient implements AutoCloseable
|
|||||||
|
|
||||||
logger.info("Index {} has {} archives", i, indexData.getArchives().length);
|
logger.info("Index {} has {} archives", i, indexData.getArchives().length);
|
||||||
|
|
||||||
|
if (watcher != null)
|
||||||
|
{
|
||||||
|
watcher.indexComplete(index);
|
||||||
|
}
|
||||||
|
|
||||||
for (ArchiveData ad : indexData.getArchives())
|
for (ArchiveData ad : indexData.getArchives())
|
||||||
{
|
{
|
||||||
Archive existing = index.getArchive(ad.getId());
|
Archive existing = index.getArchive(ad.getId());
|
||||||
|
|||||||
@@ -25,9 +25,11 @@
|
|||||||
package net.runelite.cache.client;
|
package net.runelite.cache.client;
|
||||||
|
|
||||||
import net.runelite.cache.fs.Archive;
|
import net.runelite.cache.fs.Archive;
|
||||||
|
import net.runelite.cache.fs.Index;
|
||||||
|
|
||||||
@FunctionalInterface
|
|
||||||
public interface DownloadWatcher
|
public interface DownloadWatcher
|
||||||
{
|
{
|
||||||
|
void indexComplete(Index index);
|
||||||
|
|
||||||
void downloadComplete(Archive archive, byte[] data);
|
void downloadComplete(Archive archive, byte[] data);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -34,14 +34,13 @@ import net.runelite.cache.fs.Store;
|
|||||||
import net.runelite.cache.index.FileData;
|
import net.runelite.cache.index.FileData;
|
||||||
import net.runelite.http.service.cache.beans.ArchiveEntry;
|
import net.runelite.http.service.cache.beans.ArchiveEntry;
|
||||||
import net.runelite.http.service.cache.beans.CacheEntry;
|
import net.runelite.http.service.cache.beans.CacheEntry;
|
||||||
import net.runelite.http.service.cache.beans.FileEntry;
|
|
||||||
import net.runelite.http.service.cache.beans.IndexEntry;
|
import net.runelite.http.service.cache.beans.IndexEntry;
|
||||||
import org.sql2o.Connection;
|
import org.sql2o.Connection;
|
||||||
import org.sql2o.ResultSetIterable;
|
import org.sql2o.ResultSetIterable;
|
||||||
|
|
||||||
public class CacheStorage implements Storage
|
public class CacheStorage implements Storage
|
||||||
{
|
{
|
||||||
private CacheEntry cacheEntry;
|
private final CacheEntry cacheEntry;
|
||||||
private final CacheDAO cacheDao;
|
private final CacheDAO cacheDao;
|
||||||
private final Connection con;
|
private final Connection con;
|
||||||
|
|
||||||
@@ -52,16 +51,6 @@ public class CacheStorage implements Storage
|
|||||||
this.con = con;
|
this.con = con;
|
||||||
}
|
}
|
||||||
|
|
||||||
public CacheEntry getCacheEntry()
|
|
||||||
{
|
|
||||||
return cacheEntry;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setCacheEntry(CacheEntry cacheEntry)
|
|
||||||
{
|
|
||||||
this.cacheEntry = cacheEntry;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(Store store) throws IOException
|
public void init(Store store) throws IOException
|
||||||
{
|
{
|
||||||
@@ -102,30 +91,7 @@ public class CacheStorage implements Storage
|
|||||||
@Override
|
@Override
|
||||||
public void save(Store store) throws IOException
|
public void save(Store store) throws IOException
|
||||||
{
|
{
|
||||||
for (Index index : store.getIndexes())
|
throw new UnsupportedOperationException();
|
||||||
{
|
|
||||||
IndexEntry entry = cacheDao.findOrCreateIndex(con, cacheEntry, index.getId(), index.getCrc(), index.getRevision());
|
|
||||||
// this assumes nothing is associated to the cache yet
|
|
||||||
cacheDao.associateIndexToCache(con, cacheEntry, entry);
|
|
||||||
|
|
||||||
for (Archive archive : index.getArchives())
|
|
||||||
{
|
|
||||||
ArchiveEntry archiveEntry = cacheDao.findArchive(con, entry, archive.getArchiveId(),
|
|
||||||
archive.getNameHash(), archive.getCrc(), archive.getRevision());
|
|
||||||
if (archiveEntry == null)
|
|
||||||
{
|
|
||||||
byte[] hash = archive.getHash();
|
|
||||||
archiveEntry = cacheDao.createArchive(con, entry, archive.getArchiveId(),
|
|
||||||
archive.getNameHash(), archive.getCrc(), archive.getRevision(), hash);
|
|
||||||
|
|
||||||
for (FileData file : archive.getFileData())
|
|
||||||
{
|
|
||||||
cacheDao.associateFileToArchive(con, archiveEntry, file.getId(), file.getNameHash());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
cacheDao.associateArchiveToIndex(con, archiveEntry, entry);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -29,15 +29,21 @@ import io.minio.errors.InvalidEndpointException;
|
|||||||
import io.minio.errors.InvalidPortException;
|
import io.minio.errors.InvalidPortException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import net.runelite.cache.client.CacheClient;
|
import net.runelite.cache.client.CacheClient;
|
||||||
|
import net.runelite.cache.client.DownloadWatcher;
|
||||||
import net.runelite.cache.client.IndexInfo;
|
import net.runelite.cache.client.IndexInfo;
|
||||||
import net.runelite.cache.fs.Archive;
|
import net.runelite.cache.fs.Archive;
|
||||||
|
import net.runelite.cache.fs.Index;
|
||||||
import net.runelite.cache.fs.Store;
|
import net.runelite.cache.fs.Store;
|
||||||
|
import net.runelite.cache.index.FileData;
|
||||||
import net.runelite.http.api.RuneLiteAPI;
|
import net.runelite.http.api.RuneLiteAPI;
|
||||||
|
import net.runelite.http.service.cache.beans.ArchiveEntry;
|
||||||
import net.runelite.http.service.cache.beans.CacheEntry;
|
import net.runelite.http.service.cache.beans.CacheEntry;
|
||||||
import net.runelite.http.service.cache.beans.IndexEntry;
|
import net.runelite.http.service.cache.beans.IndexEntry;
|
||||||
import net.runelite.protocol.api.login.HandshakeResponseType;
|
import net.runelite.protocol.api.login.HandshakeResponseType;
|
||||||
@@ -95,8 +101,45 @@ public class CacheUpdater
|
|||||||
|
|
||||||
ExecutorService executor = Executors.newSingleThreadExecutor();
|
ExecutorService executor = Executors.newSingleThreadExecutor();
|
||||||
|
|
||||||
CacheClient client = new CacheClient(store, rsVersion,
|
CacheEntry newCache = created ? cache : cacheDao.createCache(con, rsVersion, Instant.now());
|
||||||
(Archive archive, byte[] data) -> executor.submit(new CacheUploader(minioClient, minioBucket, archive, data)));
|
|
||||||
|
CacheClient client = new CacheClient(store, rsVersion, new DownloadWatcher()
|
||||||
|
{
|
||||||
|
private final Map<Index, IndexEntry> indexEntryMap = new HashMap<>();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void indexComplete(Index index)
|
||||||
|
{
|
||||||
|
IndexEntry entry = cacheDao.findOrCreateIndex(con, newCache, index.getId(), index.getCrc(), index.getRevision());
|
||||||
|
// this assumes nothing is associated to the cache yet
|
||||||
|
cacheDao.associateIndexToCache(con, newCache, entry);
|
||||||
|
indexEntryMap.put(index, entry);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void downloadComplete(Archive archive, byte[] data)
|
||||||
|
{
|
||||||
|
executor.submit(new CacheUploader(minioClient, minioBucket, archive, data));
|
||||||
|
|
||||||
|
IndexEntry entry = indexEntryMap.get(archive.getIndex());
|
||||||
|
ArchiveEntry archiveEntry = cacheDao.findArchive(con, entry, archive.getArchiveId(),
|
||||||
|
archive.getNameHash(), archive.getCrc(), archive.getRevision());
|
||||||
|
if (archiveEntry == null)
|
||||||
|
{
|
||||||
|
byte[] hash = archive.getHash();
|
||||||
|
archiveEntry = cacheDao.createArchive(con, entry, archive.getArchiveId(),
|
||||||
|
archive.getNameHash(), archive.getCrc(), archive.getRevision(), hash);
|
||||||
|
|
||||||
|
for (FileData file : archive.getFileData())
|
||||||
|
{
|
||||||
|
cacheDao.associateFileToArchive(con, archiveEntry, file.getId(), file.getNameHash());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
cacheDao.associateArchiveToIndex(con, archiveEntry, entry);
|
||||||
|
|
||||||
|
archive.setFileData(null); // don't need this anymore
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
client.connect();
|
client.connect();
|
||||||
HandshakeResponseType result = client.handshake().join();
|
HandshakeResponseType result = client.handshake().join();
|
||||||
@@ -117,11 +160,6 @@ public class CacheUpdater
|
|||||||
|
|
||||||
client.download();
|
client.download();
|
||||||
|
|
||||||
CacheEntry newCache = created ? cache : cacheDao.createCache(con, rsVersion, Instant.now());
|
|
||||||
|
|
||||||
storage.setCacheEntry(newCache);
|
|
||||||
store.save();
|
|
||||||
|
|
||||||
// ensure objects are added to the store before they become
|
// ensure objects are added to the store before they become
|
||||||
// visible in the database
|
// visible in the database
|
||||||
executor.shutdown();
|
executor.shutdown();
|
||||||
|
|||||||
Reference in New Issue
Block a user