cache client: batch requested archives instead of requesting individually

This commit is contained in:
Adam
2017-05-31 20:51:36 -04:00
parent c7af537df7
commit f81592c320
2 changed files with 144 additions and 95 deletions

View File

@@ -43,7 +43,6 @@ import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import net.runelite.cache.IndexType;
import net.runelite.cache.downloader.requests.ConnectionInfo;
import net.runelite.cache.downloader.requests.FileRequest;
import net.runelite.cache.downloader.requests.HelloHandshake;
@@ -60,7 +59,9 @@ public class CacheClient implements AutoCloseable
private static final String HOST = "oldschool1.runescape.com";
private static final int PORT = 43594;
private static final int CLIENT_REVISION = 139;
private static final int CLIENT_REVISION = 142;
private static final int MAX_REQUESTS = 19; // too many and the server closes the conncetion
private final Store store; // store cache will be written to
private final String host;
@@ -175,7 +176,7 @@ public class CacheClient implements AutoCloseable
{
Stopwatch stopwatch = Stopwatch.createStarted();
FileResult result = requestFile(255, 255).get();
FileResult result = requestFile(255, 255, true).get();
result.decompress(null);
ByteBuf buffer = Unpooled.wrappedBuffer(result.getContents());
@@ -212,7 +213,7 @@ public class CacheClient implements AutoCloseable
logger.info("Downloading index {}", i);
FileResult indexFileResult = requestFile(255, i).get();
FileResult indexFileResult = requestFile(255, i, true).get();
indexFileResult.decompress(null);
logger.info("Downloaded index {}", i);
@@ -261,14 +262,12 @@ public class CacheClient implements AutoCloseable
oldArchive.getRevision(), archive.getRevision());
}
FileResult archiveFileResult = requestFile(index.getId(), archive.getArchiveId()).get();
byte[] compressedContents = archiveFileResult.getCompressedData();
archive.setData(compressedContents);
if (index.getId() != IndexType.MAPS.getNumber())
CompletableFuture<FileResult> future = requestFile(index.getId(), archive.getArchiveId(), false);
future.handle((fr, ex) ->
{
archive.decompressAndLoad(null);
}
archive.setData(fr.getCompressedData());
return null;
});
}
else
{
@@ -290,32 +289,68 @@ public class CacheClient implements AutoCloseable
}
}
// flush any pending requests
channel.flush();
while (!requests.isEmpty())
{
// wait for pending requests
synchronized (this)
{
wait();
}
}
stopwatch.stop();
logger.info("Download completed in {}", stopwatch);
}
public synchronized CompletableFuture<FileResult> requestFile(int index, int fileId)
public synchronized CompletableFuture<FileResult> requestFile(int index, int fileId, boolean flush)
{
if (state != ClientState.CONNECTED)
{
throw new IllegalStateException("Can't request files until connected!");
}
if (!flush)
{
while (requests.size() >= MAX_REQUESTS)
{
channel.flush();
try
{
wait();
}
catch (InterruptedException ex)
{
logger.warn("interrupted while waiting for requests", ex);
}
}
}
ByteBuf buf = Unpooled.buffer(4);
FileRequest request = new FileRequest(index, fileId);
CompletableFuture<FileResult> future = new CompletableFuture<>();
PendingFileRequest pf = new PendingFileRequest(request, future);
buf.writeByte(request.getIndex() == 255 ? 1 : 0);
buf.writeByte(0); // 0/1 - seems to be a priority?
int hash = pf.computeHash();
buf.writeMedium(hash);
logger.trace("Sending request for {}/{}", index, fileId);
channel.writeAndFlush(buf);
requests.add(pf);
if (!flush)
{
channel.write(buf);
}
else
{
channel.writeAndFlush(buf);
}
return future;
}
@@ -343,6 +378,8 @@ public class CacheClient implements AutoCloseable
requests.remove(pr);
notify();
FileResult result = new FileResult(index, file, compressedData);
logger.debug("File download finished for index {} file {}, length {}", index, file, compressedData.length);

View File

@@ -22,7 +22,6 @@
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package net.runelite.cache.downloader;
import io.netty.buffer.ByteBuf;
@@ -46,6 +45,12 @@ public class CacheClientHandler extends ChannelInboundHandlerAdapter
this.client = client;
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception
{
logger.warn("Channel has gone inactive");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
{
@@ -63,108 +68,121 @@ public class CacheClientHandler extends ChannelInboundHandlerAdapter
if (response != HelloHandshake.RESPONSE_OK)
{
if (response == HelloHandshake.RESPONSE_OUTDATED)
{
logger.warn("Client version is outdated");
}
else
{
logger.warn("Handshake response error {}", response);
}
}
client.onHandshake(response);
}
else if (state == ClientState.CONNECTED)
{
if (buffer.readableBytes() < 8)
{
logger.trace("Connected, but not enough data yet to read header");
return;
}
ByteBuf copy = buffer.slice();
int index = copy.readUnsignedByte();
int file = copy.readUnsignedShort();
// decompress() starts reading here
int compression = copy.readUnsignedByte();
int compressedFileSize = copy.readInt();
int size = compressedFileSize
+ 5 // 1 byte compresion type, 4 byte compressed size
+ (compression != 0 ? 4 : 0); // compression has leading 4 byte decompressed length
int breaks = calculateBreaks(size);
// 3 for index/file
if (size + 3 + breaks > buffer.readableBytes())
{
logger.trace("Index {} archive {}: Not enough data yet {} > {}", index, file, size + 3 + breaks, buffer.readableBytes());
return;
}
byte[] compressedData = new byte[size];
int compressedDataOffset = 0;
int totalRead = 3;
buffer.skipBytes(3); // skip index/file
for (int i = 0; i < breaks + 1; ++i)
{
int bytesInBlock = 512 - (totalRead % 512);
int bytesToRead = Math.min(bytesInBlock, size - compressedDataOffset);
logger.trace("{}/{}: reading block {}/{}, read so far this block: {}, file status: {}/{}",
index, file,
(totalRead % 512), 512,
bytesInBlock,
compressedDataOffset, size);
buffer.getBytes(buffer.readerIndex(), compressedData, compressedDataOffset, bytesToRead);
buffer.skipBytes(bytesToRead);
compressedDataOffset += bytesToRead;
totalRead += bytesToRead;
if (i < breaks)
{
assert compressedDataOffset < size;
int b = buffer.readUnsignedByte();
++totalRead;
assert b == 0xff;
}
}
assert compressedDataOffset == size;
logger.trace("{}/{}: done downloading file, remaining buffer {}",
index, file,
buffer.readableBytes());
buffer.clear();
client.onFileFinish(index, file, compressedData);
while (readFile());
}
buffer.discardReadBytes();
ReferenceCountUtil.release(msg);
}
/** Calculate how many breaks there are in the file stream.
* There are calculateBreaks()+1 total chunks in the file stream
private boolean readFile()
{
if (buffer.readableBytes() < 8)
{
logger.trace("Connected, but not enough data yet to read header");
return false;
}
ByteBuf copy = buffer.slice();
int index = copy.readUnsignedByte();
int file = copy.readUnsignedShort();
// decompress() starts reading here
int compression = copy.readUnsignedByte();
int compressedFileSize = copy.readInt();
int size = compressedFileSize
+ 5 // 1 byte compresion type, 4 byte compressed size
+ (compression != 0 ? 4 : 0); // compression has leading 4 byte decompressed length
assert size > 0;
int breaks = calculateBreaks(size);
// 3 for index/file
if (size + 3 + breaks > buffer.readableBytes())
{
logger.trace("Index {} archive {}: Not enough data yet {} > {}", index, file, size + 3 + breaks, buffer.readableBytes());
return false;
}
byte[] compressedData = new byte[size];
int compressedDataOffset = 0;
int totalRead = 3;
buffer.skipBytes(3); // skip index/file
for (int i = 0; i < breaks + 1; ++i)
{
int bytesInBlock = 512 - (totalRead % 512);
int bytesToRead = Math.min(bytesInBlock, size - compressedDataOffset);
logger.trace("{}/{}: reading block {}/{}, read so far this block: {}, file status: {}/{}",
index, file,
(totalRead % 512), 512,
bytesInBlock,
compressedDataOffset, size);
buffer.getBytes(buffer.readerIndex(), compressedData, compressedDataOffset, bytesToRead);
buffer.skipBytes(bytesToRead);
compressedDataOffset += bytesToRead;
totalRead += bytesToRead;
if (i < breaks)
{
assert compressedDataOffset < size;
int b = buffer.readUnsignedByte();
++totalRead;
assert b == 0xff;
}
}
assert compressedDataOffset == size;
logger.trace("{}/{}: done downloading file, remaining buffer {}",
index, file,
buffer.readableBytes());
client.onFileFinish(index, file, compressedData);
return true;
}
/**
* Calculate how many breaks there are in the file stream. There are
* calculateBreaks()+1 total chunks in the file stream
*
* File contents are sent in 512 byte chunks, with the first byte of
* each chunk except for the first one being 0xff.
*
* The first chunk has an 8 byte header (index, file, compression,
* compressed size). So, the first chunk can contain 512 - 8 bytes
* of the file, and each chunk after 511 bytes.
* compressed size). So, the first chunk can contain 512 - 8 bytes of
* the file, and each chunk after 511 bytes.
*
* The 'size' parameter has the compression type and size included in
* it, since they haven't been read yet by the buffer stream, as
* decompress() reads it, so we use 512 - 3 (because 8-5) = 3
* decompress() reads it, so we use 512 - 3 (because 8-5) = 3
*/
private int calculateBreaks(int size)
{
int initialSize = 512 - 3;
if (size <= initialSize)
{
return 0; // First in the initial chunk, no breaks
}
int left = size - initialSize;
if (left % 511 == 0)
@@ -180,12 +198,6 @@ public class CacheClientHandler extends ChannelInboundHandlerAdapter
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx)
{
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
{