Merge pull request #2076 from Owain94/eventbus-stuff

eventbus: Multithreading support
This commit is contained in:
Owain van Brakel
2019-11-27 16:01:45 +01:00
committed by GitHub
6 changed files with 99 additions and 25 deletions

View File

@@ -2,10 +2,14 @@ package net.runelite.client.eventbus;
import com.jakewharton.rxrelay2.PublishRelay; import com.jakewharton.rxrelay2.PublishRelay;
import com.jakewharton.rxrelay2.Relay; import com.jakewharton.rxrelay2.Relay;
import io.reactivex.ObservableTransformer;
import io.reactivex.Scheduler;
import io.reactivex.annotations.NonNull; import io.reactivex.annotations.NonNull;
import io.reactivex.annotations.Nullable;
import io.reactivex.disposables.CompositeDisposable; import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable; import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer; import io.reactivex.functions.Consumer;
import io.reactivex.schedulers.Schedulers;
import io.sentry.Sentry; import io.sentry.Sentry;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@@ -15,6 +19,7 @@ import javax.inject.Singleton;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import net.runelite.api.events.Event; import net.runelite.api.events.Event;
import net.runelite.client.RuneLiteProperties; import net.runelite.client.RuneLiteProperties;
import net.runelite.client.callback.ClientThread;
import net.runelite.client.config.OpenOSRSConfig; import net.runelite.client.config.OpenOSRSConfig;
@Slf4j @Slf4j
@@ -28,6 +33,9 @@ public class EventBus implements EventBusInterface
@Inject @Inject
private OpenOSRSConfig openOSRSConfig; private OpenOSRSConfig openOSRSConfig;
@Inject
private ClientThread clientThread;
@NonNull @NonNull
private <T extends Event> Relay<Object> getSubject(Class<T> eventClass) private <T extends Event> Relay<Object> getSubject(Class<T> eventClass)
{ {
@@ -47,34 +55,65 @@ public class EventBus implements EventBusInterface
return compositeDisposable; return compositeDisposable;
} }
@Override private <T> ObservableTransformer<T, T> applyTake(int until)
// Subscribe on lifecycle (for example from plugin startUp -> shutdown)
public <T extends Event> void subscribe(Class<T> eventClass, @NonNull Object lifecycle, @NonNull Consumer<T> action)
{ {
if (subscriptionList.containsKey(lifecycle) && eventClass.equals(subscriptionList.get(lifecycle))) return observable -> until > 0 ? observable.take(until) : observable;
}
private Scheduler getScheduler(EventScheduler scheduler)
{
Scheduler subscribeScheduler;
switch (scheduler)
{ {
return; case COMPUTATION:
subscribeScheduler = Schedulers.computation();
break;
case IO:
subscribeScheduler = Schedulers.io();
break;
case NEWTHREAD:
subscribeScheduler = Schedulers.newThread();
break;
case SINGLE:
subscribeScheduler = Schedulers.single();
break;
case TRAMPOLINE:
subscribeScheduler = Schedulers.trampoline();
break;
case CLIENT:
subscribeScheduler = Schedulers.from(clientThread);
break;
case DEFAULT:
default:
subscribeScheduler = null;
break;
} }
Disposable disposable = getSubject(eventClass) return subscribeScheduler;
.filter(Objects::nonNull) // Filter out null objects, better safe than sorry }
.cast(eventClass) // Cast it for easier usage
.subscribe(action, error ->
{
log.error("Exception in eventbus", error);
if (RuneLiteProperties.getLauncherVersion() != null && openOSRSConfig.shareLogs()) private <T> ObservableTransformer<T, T> applyScheduler(EventScheduler eventScheduler, boolean subscribe)
{ {
Sentry.capture(error); Scheduler scheduler = getScheduler(eventScheduler);
}
});
getCompositeDisposable(lifecycle).add(disposable); return observable -> scheduler == null ? observable : subscribe ? observable.subscribeOn(scheduler) : observable.observeOn(scheduler);
subscriptionList.put(lifecycle, eventClass);
} }
@Override @Override
public <T extends Event> void subscribe(Class<T> eventClass, @NonNull Object lifecycle, @NonNull Consumer<T> action, int takeUntil) public <T extends Event> void subscribe(Class<T> eventClass, @NonNull Object lifecycle, @NonNull Consumer<T> action)
{
subscribe(eventClass, lifecycle, action, -1, EventScheduler.DEFAULT, EventScheduler.DEFAULT);
}
@Override
public <T extends Event> void subscribe(Class<T> eventClass, @NonNull Object lifecycle, @NonNull Consumer<T> action, int takeUtil)
{
subscribe(eventClass, lifecycle, action, takeUtil, EventScheduler.DEFAULT, EventScheduler.DEFAULT);
}
@Override
// Subscribe on lifecycle (for example from plugin startUp -> shutdown)
public <T extends Event> void subscribe(Class<T> eventClass, @NonNull Object lifecycle, @NonNull Consumer<T> action, int takeUntil, @Nullable EventScheduler subscribe, @Nullable EventScheduler observe)
{ {
if (subscriptionList.containsKey(lifecycle) && eventClass.equals(subscriptionList.get(lifecycle))) if (subscriptionList.containsKey(lifecycle) && eventClass.equals(subscriptionList.get(lifecycle)))
{ {
@@ -82,10 +121,12 @@ public class EventBus implements EventBusInterface
} }
Disposable disposable = getSubject(eventClass) Disposable disposable = getSubject(eventClass)
.compose(applyTake(takeUntil))
.filter(Objects::nonNull) // Filter out null objects, better safe than sorry .filter(Objects::nonNull) // Filter out null objects, better safe than sorry
.cast(eventClass) // Cast it for easier usage .cast(eventClass) // Cast it for easier usage
.take(takeUntil)
.doFinally(() -> unregister(lifecycle)) .doFinally(() -> unregister(lifecycle))
.compose(applyScheduler(subscribe, true))
.compose(applyScheduler(observe, false))
.subscribe(action, error -> .subscribe(action, error ->
{ {
log.error("Exception in eventbus", error); log.error("Exception in eventbus", error);

View File

@@ -10,6 +10,8 @@ public interface EventBusInterface
<T extends Event> void subscribe(Class<T> eventClass, @NonNull Object lifecycle, @NonNull Consumer<T> action, int takeUntil); <T extends Event> void subscribe(Class<T> eventClass, @NonNull Object lifecycle, @NonNull Consumer<T> action, int takeUntil);
<T extends Event> void subscribe(Class<T> eventClass, @NonNull Object lifecycle, @NonNull Consumer<T> action, int takeUntil, EventScheduler subscribe, EventScheduler observe);
void unregister(@NonNull Object lifecycle); void unregister(@NonNull Object lifecycle);
<T extends Event> void post(Class<? extends T> eventClass, @NonNull T event); <T extends Event> void post(Class<? extends T> eventClass, @NonNull T event);

View File

@@ -0,0 +1,21 @@
package net.runelite.client.eventbus;
import io.reactivex.annotations.Nullable;
public enum EventScheduler
{
DEFAULT(null),
COMPUTATION("computation"),
IO("io"),
NEWTHREAD("newThread"),
SINGLE("single"),
TRAMPOLINE("trampoline"),
CLIENT("client");
public final String scheduler;
EventScheduler(@Nullable String scheduler)
{
this.scheduler = scheduler;
}
}

View File

@@ -36,4 +36,9 @@ import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME) @Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD) @Target(ElementType.METHOD)
@Documented @Documented
public @interface Subscribe {} public @interface Subscribe
{
int takeUntil() default -1;
EventScheduler subscribe() default EventScheduler.DEFAULT;
EventScheduler observe() default EventScheduler.DEFAULT;
}

View File

@@ -36,6 +36,7 @@ import lombok.Getter;
import lombok.Value; import lombok.Value;
import net.runelite.api.events.Event; import net.runelite.api.events.Event;
import net.runelite.client.eventbus.EventBus; import net.runelite.client.eventbus.EventBus;
import net.runelite.client.eventbus.EventScheduler;
import net.runelite.client.eventbus.Subscribe; import net.runelite.client.eventbus.Subscribe;
public abstract class Plugin implements Module public abstract class Plugin implements Module
@@ -62,7 +63,7 @@ public abstract class Plugin implements Module
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final void addAnnotatedSubscriptions(EventBus eventBus) final void addAnnotatedSubscriptions(EventBus eventBus)
{ {
annotatedSubscriptions.forEach(sub -> eventBus.subscribe(sub.type, annotatedSubsLock, sub.method)); annotatedSubscriptions.forEach(sub -> eventBus.subscribe(sub.type, annotatedSubsLock, sub.method, sub.takeUntil, sub.subscribe, sub.observe));
} }
final void removeAnnotatedSubscriptions(EventBus eventBus) final void removeAnnotatedSubscriptions(EventBus eventBus)
@@ -76,7 +77,8 @@ public abstract class Plugin implements Module
for (Method method : this.getClass().getDeclaredMethods()) for (Method method : this.getClass().getDeclaredMethods())
{ {
if (method.getAnnotation(Subscribe.class) == null) Subscribe annotation = method.getAnnotation(Subscribe.class);
if (annotation == null)
{ {
continue; continue;
} }
@@ -90,7 +92,7 @@ public abstract class Plugin implements Module
method.setAccessible(true); method.setAccessible(true);
Subscription sub = new Subscription(type.asSubclass(Event.class), event -> method.invoke(this, event)); Subscription sub = new Subscription(type.asSubclass(Event.class), event -> method.invoke(this, event), annotation.takeUntil(), annotation.subscribe(), annotation.observe());
builder.add(sub); builder.add(sub);
} }
@@ -103,5 +105,8 @@ public abstract class Plugin implements Module
{ {
private final Class type; private final Class type;
private final Consumer method; private final Consumer method;
private final int takeUntil;
private final EventScheduler subscribe;
private final EventScheduler observe;
} }
} }

View File

@@ -94,7 +94,7 @@ public class DefaultWorldPlugin extends Plugin
applyWorld(); applyWorld();
} }
@Subscribe @Subscribe(takeUntil = 2)
private void onGameStateChanged(GameStateChanged event) private void onGameStateChanged(GameStateChanged event)
{ {
if (event.getGameState() == GameState.LOGGED_IN) if (event.getGameState() == GameState.LOGGED_IN)