diff --git a/cache/src/main/java/net/runelite/cache/downloader/CacheClient.java b/cache/src/main/java/net/runelite/cache/downloader/CacheClient.java index 926deb79b9..e9a1d3329e 100644 --- a/cache/src/main/java/net/runelite/cache/downloader/CacheClient.java +++ b/cache/src/main/java/net/runelite/cache/downloader/CacheClient.java @@ -43,7 +43,6 @@ import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import net.runelite.cache.IndexType; import net.runelite.cache.downloader.requests.ConnectionInfo; import net.runelite.cache.downloader.requests.FileRequest; import net.runelite.cache.downloader.requests.HelloHandshake; @@ -60,7 +59,9 @@ public class CacheClient implements AutoCloseable private static final String HOST = "oldschool1.runescape.com"; private static final int PORT = 43594; - private static final int CLIENT_REVISION = 139; + private static final int CLIENT_REVISION = 142; + + private static final int MAX_REQUESTS = 19; // too many and the server closes the conncetion private final Store store; // store cache will be written to private final String host; @@ -175,7 +176,7 @@ public class CacheClient implements AutoCloseable { Stopwatch stopwatch = Stopwatch.createStarted(); - FileResult result = requestFile(255, 255).get(); + FileResult result = requestFile(255, 255, true).get(); result.decompress(null); ByteBuf buffer = Unpooled.wrappedBuffer(result.getContents()); @@ -212,7 +213,7 @@ public class CacheClient implements AutoCloseable logger.info("Downloading index {}", i); - FileResult indexFileResult = requestFile(255, i).get(); + FileResult indexFileResult = requestFile(255, i, true).get(); indexFileResult.decompress(null); logger.info("Downloaded index {}", i); @@ -261,14 +262,12 @@ public class CacheClient implements AutoCloseable oldArchive.getRevision(), archive.getRevision()); } - FileResult archiveFileResult = requestFile(index.getId(), archive.getArchiveId()).get(); - byte[] compressedContents = archiveFileResult.getCompressedData(); - - archive.setData(compressedContents); - if (index.getId() != IndexType.MAPS.getNumber()) + CompletableFuture future = requestFile(index.getId(), archive.getArchiveId(), false); + future.handle((fr, ex) -> { - archive.decompressAndLoad(null); - } + archive.setData(fr.getCompressedData()); + return null; + }); } else { @@ -290,32 +289,68 @@ public class CacheClient implements AutoCloseable } } + // flush any pending requests + channel.flush(); + + while (!requests.isEmpty()) + { + // wait for pending requests + synchronized (this) + { + wait(); + } + } + stopwatch.stop(); logger.info("Download completed in {}", stopwatch); } - public synchronized CompletableFuture requestFile(int index, int fileId) + public synchronized CompletableFuture requestFile(int index, int fileId, boolean flush) { if (state != ClientState.CONNECTED) { throw new IllegalStateException("Can't request files until connected!"); } + if (!flush) + { + while (requests.size() >= MAX_REQUESTS) + { + channel.flush(); + + try + { + wait(); + } + catch (InterruptedException ex) + { + logger.warn("interrupted while waiting for requests", ex); + } + } + } + ByteBuf buf = Unpooled.buffer(4); FileRequest request = new FileRequest(index, fileId); CompletableFuture future = new CompletableFuture<>(); PendingFileRequest pf = new PendingFileRequest(request, future); - buf.writeByte(request.getIndex() == 255 ? 1 : 0); + buf.writeByte(0); // 0/1 - seems to be a priority? int hash = pf.computeHash(); buf.writeMedium(hash); logger.trace("Sending request for {}/{}", index, fileId); - channel.writeAndFlush(buf); - requests.add(pf); + if (!flush) + { + channel.write(buf); + } + else + { + channel.writeAndFlush(buf); + } + return future; } @@ -343,6 +378,8 @@ public class CacheClient implements AutoCloseable requests.remove(pr); + notify(); + FileResult result = new FileResult(index, file, compressedData); logger.debug("File download finished for index {} file {}, length {}", index, file, compressedData.length); diff --git a/cache/src/main/java/net/runelite/cache/downloader/CacheClientHandler.java b/cache/src/main/java/net/runelite/cache/downloader/CacheClientHandler.java index 5ddc62df3d..ba54a591d8 100644 --- a/cache/src/main/java/net/runelite/cache/downloader/CacheClientHandler.java +++ b/cache/src/main/java/net/runelite/cache/downloader/CacheClientHandler.java @@ -22,7 +22,6 @@ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ - package net.runelite.cache.downloader; import io.netty.buffer.ByteBuf; @@ -46,6 +45,12 @@ public class CacheClientHandler extends ChannelInboundHandlerAdapter this.client = client; } + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception + { + logger.warn("Channel has gone inactive"); + } + @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { @@ -63,108 +68,121 @@ public class CacheClientHandler extends ChannelInboundHandlerAdapter if (response != HelloHandshake.RESPONSE_OK) { if (response == HelloHandshake.RESPONSE_OUTDATED) + { logger.warn("Client version is outdated"); + } else + { logger.warn("Handshake response error {}", response); + } } client.onHandshake(response); } else if (state == ClientState.CONNECTED) { - if (buffer.readableBytes() < 8) - { - logger.trace("Connected, but not enough data yet to read header"); - return; - } - - ByteBuf copy = buffer.slice(); - - int index = copy.readUnsignedByte(); - int file = copy.readUnsignedShort(); - // decompress() starts reading here - int compression = copy.readUnsignedByte(); - int compressedFileSize = copy.readInt(); - - int size = compressedFileSize - + 5 // 1 byte compresion type, 4 byte compressed size - + (compression != 0 ? 4 : 0); // compression has leading 4 byte decompressed length - - int breaks = calculateBreaks(size); - - // 3 for index/file - if (size + 3 + breaks > buffer.readableBytes()) - { - logger.trace("Index {} archive {}: Not enough data yet {} > {}", index, file, size + 3 + breaks, buffer.readableBytes()); - return; - } - - byte[] compressedData = new byte[size]; - int compressedDataOffset = 0; - - int totalRead = 3; - buffer.skipBytes(3); // skip index/file - - for (int i = 0; i < breaks + 1; ++i) - { - int bytesInBlock = 512 - (totalRead % 512); - int bytesToRead = Math.min(bytesInBlock, size - compressedDataOffset); - - logger.trace("{}/{}: reading block {}/{}, read so far this block: {}, file status: {}/{}", - index, file, - (totalRead % 512), 512, - bytesInBlock, - compressedDataOffset, size); - - buffer.getBytes(buffer.readerIndex(), compressedData, compressedDataOffset, bytesToRead); - buffer.skipBytes(bytesToRead); - - compressedDataOffset += bytesToRead; - totalRead += bytesToRead; - - if (i < breaks) - { - assert compressedDataOffset < size; - int b = buffer.readUnsignedByte(); - ++totalRead; - assert b == 0xff; - } - } - - assert compressedDataOffset == size; - - logger.trace("{}/{}: done downloading file, remaining buffer {}", - index, file, - buffer.readableBytes()); - buffer.clear(); - - client.onFileFinish(index, file, compressedData); + while (readFile()); } buffer.discardReadBytes(); ReferenceCountUtil.release(msg); } - /** Calculate how many breaks there are in the file stream. - * There are calculateBreaks()+1 total chunks in the file stream + private boolean readFile() + { + if (buffer.readableBytes() < 8) + { + logger.trace("Connected, but not enough data yet to read header"); + return false; + } + + ByteBuf copy = buffer.slice(); + + int index = copy.readUnsignedByte(); + int file = copy.readUnsignedShort(); + // decompress() starts reading here + int compression = copy.readUnsignedByte(); + int compressedFileSize = copy.readInt(); + + int size = compressedFileSize + + 5 // 1 byte compresion type, 4 byte compressed size + + (compression != 0 ? 4 : 0); // compression has leading 4 byte decompressed length + + assert size > 0; + + int breaks = calculateBreaks(size); + + // 3 for index/file + if (size + 3 + breaks > buffer.readableBytes()) + { + logger.trace("Index {} archive {}: Not enough data yet {} > {}", index, file, size + 3 + breaks, buffer.readableBytes()); + return false; + } + + byte[] compressedData = new byte[size]; + int compressedDataOffset = 0; + + int totalRead = 3; + buffer.skipBytes(3); // skip index/file + + for (int i = 0; i < breaks + 1; ++i) + { + int bytesInBlock = 512 - (totalRead % 512); + int bytesToRead = Math.min(bytesInBlock, size - compressedDataOffset); + + logger.trace("{}/{}: reading block {}/{}, read so far this block: {}, file status: {}/{}", + index, file, + (totalRead % 512), 512, + bytesInBlock, + compressedDataOffset, size); + + buffer.getBytes(buffer.readerIndex(), compressedData, compressedDataOffset, bytesToRead); + buffer.skipBytes(bytesToRead); + + compressedDataOffset += bytesToRead; + totalRead += bytesToRead; + + if (i < breaks) + { + assert compressedDataOffset < size; + int b = buffer.readUnsignedByte(); + ++totalRead; + assert b == 0xff; + } + } + + assert compressedDataOffset == size; + + logger.trace("{}/{}: done downloading file, remaining buffer {}", + index, file, + buffer.readableBytes()); + + client.onFileFinish(index, file, compressedData); + return true; + } + + /** + * Calculate how many breaks there are in the file stream. There are + * calculateBreaks()+1 total chunks in the file stream * * File contents are sent in 512 byte chunks, with the first byte of * each chunk except for the first one being 0xff. * * The first chunk has an 8 byte header (index, file, compression, - * compressed size). So, the first chunk can contain 512 - 8 bytes - * of the file, and each chunk after 511 bytes. + * compressed size). So, the first chunk can contain 512 - 8 bytes of + * the file, and each chunk after 511 bytes. * * The 'size' parameter has the compression type and size included in * it, since they haven't been read yet by the buffer stream, as - * decompress() reads it, so we use 512 - 3 (because 8-5) = 3 + * decompress() reads it, so we use 512 - 3 (because 8-5) = 3 */ private int calculateBreaks(int size) { int initialSize = 512 - 3; if (size <= initialSize) + { return 0; // First in the initial chunk, no breaks - + } int left = size - initialSize; if (left % 511 == 0) @@ -180,12 +198,6 @@ public class CacheClientHandler extends ChannelInboundHandlerAdapter } } - @Override - public void channelReadComplete(ChannelHandlerContext ctx) - { - ctx.flush(); - } - @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {