cache: fix cache server to buffer stream socket data, this works with vanilla client
This commit is contained in:
@@ -49,6 +49,7 @@ public class CacheServerHandler extends SimpleChannelInboundHandler<ByteBuf>
|
|||||||
private final Store store;
|
private final Store store;
|
||||||
|
|
||||||
private ClientState state = ClientState.HANDSHAKING;
|
private ClientState state = ClientState.HANDSHAKING;
|
||||||
|
private final ByteBuf buffer = Unpooled.buffer();
|
||||||
|
|
||||||
public CacheServerHandler(CacheServer server)
|
public CacheServerHandler(CacheServer server)
|
||||||
{
|
{
|
||||||
@@ -61,22 +62,37 @@ public class CacheServerHandler extends SimpleChannelInboundHandler<ByteBuf>
|
|||||||
{
|
{
|
||||||
System.out.println(ByteBufUtil.prettyHexDump(buf));
|
System.out.println(ByteBufUtil.prettyHexDump(buf));
|
||||||
|
|
||||||
switch (state)
|
buffer.writeBytes(buf);
|
||||||
|
|
||||||
|
int last = -1;
|
||||||
|
while (buffer.readableBytes() != last)
|
||||||
{
|
{
|
||||||
case HANDSHAKING:
|
last = buffer.readableBytes();
|
||||||
handshake(ctx, buf);
|
|
||||||
break;
|
switch (state)
|
||||||
case CONNECTING:
|
{
|
||||||
connecting(ctx, buf);
|
case HANDSHAKING:
|
||||||
break;
|
handshake(ctx, buffer);
|
||||||
case CONNECTED:
|
break;
|
||||||
connected(ctx, buf);
|
case CONNECTING:
|
||||||
break;
|
connecting(ctx, buffer);
|
||||||
|
break;
|
||||||
|
case CONNECTED:
|
||||||
|
connected(ctx, buffer);
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
buffer.discardReadBytes();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handshake(ChannelHandlerContext ctx, ByteBuf buf)
|
private void handshake(ChannelHandlerContext ctx, ByteBuf buf)
|
||||||
{
|
{
|
||||||
|
if (buf.readableBytes() < 5)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
byte type = buf.readByte();
|
byte type = buf.readByte();
|
||||||
if (type != 15)
|
if (type != 15)
|
||||||
{
|
{
|
||||||
@@ -105,6 +121,11 @@ public class CacheServerHandler extends SimpleChannelInboundHandler<ByteBuf>
|
|||||||
|
|
||||||
private void connecting(ChannelHandlerContext ctx, ByteBuf buf)
|
private void connecting(ChannelHandlerContext ctx, ByteBuf buf)
|
||||||
{
|
{
|
||||||
|
if (buf.readableBytes() < 4)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
ConnectionInfo cinfo = new ConnectionInfo();
|
ConnectionInfo cinfo = new ConnectionInfo();
|
||||||
cinfo.setType(buf.readByte());
|
cinfo.setType(buf.readByte());
|
||||||
cinfo.setPadding(buf.readMedium());
|
cinfo.setPadding(buf.readMedium());
|
||||||
@@ -121,13 +142,18 @@ public class CacheServerHandler extends SimpleChannelInboundHandler<ByteBuf>
|
|||||||
// byte[1] = index
|
// byte[1] = index
|
||||||
// byte[2-3] = archive id
|
// byte[2-3] = archive id
|
||||||
|
|
||||||
|
if (buf.readableBytes() < 4)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
byte requesting255 = buf.readByte();
|
byte requesting255 = buf.readByte();
|
||||||
int index = buf.readByte() & 0xFF;
|
int index = buf.readByte() & 0xFF;
|
||||||
int archiveId = buf.readShort() & 0xFFFF;
|
int archiveId = buf.readShort() & 0xFFFF;
|
||||||
|
|
||||||
if (requesting255 != 0)
|
if (index == 255)
|
||||||
{
|
{
|
||||||
handle255(ctx, index, archiveId);
|
handle255(ctx, requesting255, index, archiveId);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@@ -135,9 +161,9 @@ public class CacheServerHandler extends SimpleChannelInboundHandler<ByteBuf>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handle255(ChannelHandlerContext ctx, int index, int archiveId)
|
private void handle255(ChannelHandlerContext ctx, byte requesting255, int index, int archiveId)
|
||||||
{
|
{
|
||||||
logger.info("Client {} requests 255, index {}, archive {}", ctx.channel().remoteAddress(), index, archiveId);
|
logger.info("Client {} requests 255 {}, index {}, archive {}", ctx.channel().remoteAddress(), requesting255, index, archiveId);
|
||||||
|
|
||||||
if (archiveId == 255)
|
if (archiveId == 255)
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -89,14 +89,27 @@ public class StoreLoadTest
|
|||||||
@Ignore
|
@Ignore
|
||||||
public void loadTree() throws IOException
|
public void loadTree() throws IOException
|
||||||
{
|
{
|
||||||
Store store = new Store(folder.newFolder());
|
try (Store store = new Store(folder.newFolder()))
|
||||||
store.loadTree(new java.io.File("C:\\rs\\temp\\tree"));
|
|
||||||
|
|
||||||
try (Store store2 = new Store(StoreLocation.LOCATION))
|
|
||||||
{
|
{
|
||||||
store2.load();
|
store.loadTree(new java.io.File("C:\\rs\\temp\\tree"));
|
||||||
|
|
||||||
|
try (Store store2 = new Store(StoreLocation.LOCATION))
|
||||||
|
{
|
||||||
|
store2.load();
|
||||||
|
|
||||||
Assert.assertEquals(store, store2);
|
Assert.assertEquals(store, store2);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@Ignore
|
||||||
|
public void saveTree() throws IOException
|
||||||
|
{
|
||||||
|
try (Store store = new Store(new java.io.File("d:/rs/07/temp/cache")))
|
||||||
|
{
|
||||||
|
store.load();
|
||||||
|
store.saveTree(new java.io.File("d:/rs/07/temp/tree"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -45,6 +45,21 @@ public class CacheServerTest
|
|||||||
@Rule
|
@Rule
|
||||||
public TemporaryFolder folder = StoreLocation.getTemporaryFolder();
|
public TemporaryFolder folder = StoreLocation.getTemporaryFolder();
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@Ignore
|
||||||
|
public void run() throws Exception
|
||||||
|
{
|
||||||
|
try (Store store = new Store(new java.io.File("D:\\rs\\07\\temp\\cache139"));
|
||||||
|
CacheServer server = new CacheServer(store, REVISION))
|
||||||
|
{
|
||||||
|
store.load();
|
||||||
|
store.rebuildCrc();
|
||||||
|
|
||||||
|
server.start();
|
||||||
|
server.waitForClose();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@Ignore
|
@Ignore
|
||||||
public void testDownload() throws Exception
|
public void testDownload() throws Exception
|
||||||
|
|||||||
Reference in New Issue
Block a user