diff --git a/runelite-client/src/main/java/net/runelite/client/eventbus/EventBus.java b/runelite-client/src/main/java/net/runelite/client/eventbus/EventBus.java index d273cf5411..b9908701f8 100644 --- a/runelite-client/src/main/java/net/runelite/client/eventbus/EventBus.java +++ b/runelite-client/src/main/java/net/runelite/client/eventbus/EventBus.java @@ -2,10 +2,14 @@ package net.runelite.client.eventbus; import com.jakewharton.rxrelay2.PublishRelay; import com.jakewharton.rxrelay2.Relay; +import io.reactivex.ObservableTransformer; +import io.reactivex.Scheduler; import io.reactivex.annotations.NonNull; +import io.reactivex.annotations.Nullable; import io.reactivex.disposables.CompositeDisposable; import io.reactivex.disposables.Disposable; import io.reactivex.functions.Consumer; +import io.reactivex.schedulers.Schedulers; import io.sentry.Sentry; import java.util.HashMap; import java.util.Map; @@ -15,6 +19,7 @@ import javax.inject.Singleton; import lombok.extern.slf4j.Slf4j; import net.runelite.api.events.Event; import net.runelite.client.RuneLiteProperties; +import net.runelite.client.callback.ClientThread; import net.runelite.client.config.OpenOSRSConfig; @Slf4j @@ -28,6 +33,9 @@ public class EventBus implements EventBusInterface @Inject private OpenOSRSConfig openOSRSConfig; + @Inject + private ClientThread clientThread; + @NonNull private Relay getSubject(Class eventClass) { @@ -47,34 +55,65 @@ public class EventBus implements EventBusInterface return compositeDisposable; } - @Override - // Subscribe on lifecycle (for example from plugin startUp -> shutdown) - public void subscribe(Class eventClass, @NonNull Object lifecycle, @NonNull Consumer action) + private ObservableTransformer applyTake(int until) { - if (subscriptionList.containsKey(lifecycle) && eventClass.equals(subscriptionList.get(lifecycle))) + return observable -> until > 0 ? observable.take(until) : observable; + } + + private Scheduler getScheduler(EventScheduler scheduler) + { + Scheduler subscribeScheduler; + switch (scheduler) { - return; + case COMPUTATION: + subscribeScheduler = Schedulers.computation(); + break; + case IO: + subscribeScheduler = Schedulers.io(); + break; + case NEWTHREAD: + subscribeScheduler = Schedulers.newThread(); + break; + case SINGLE: + subscribeScheduler = Schedulers.single(); + break; + case TRAMPOLINE: + subscribeScheduler = Schedulers.trampoline(); + break; + case CLIENT: + subscribeScheduler = Schedulers.from(clientThread); + break; + case DEFAULT: + default: + subscribeScheduler = null; + break; } - Disposable disposable = getSubject(eventClass) - .filter(Objects::nonNull) // Filter out null objects, better safe than sorry - .cast(eventClass) // Cast it for easier usage - .subscribe(action, error -> - { - log.error("Exception in eventbus", error); + return subscribeScheduler; + } - if (RuneLiteProperties.getLauncherVersion() != null && openOSRSConfig.shareLogs()) - { - Sentry.capture(error); - } - }); + private ObservableTransformer applyScheduler(EventScheduler eventScheduler, boolean subscribe) + { + Scheduler scheduler = getScheduler(eventScheduler); - getCompositeDisposable(lifecycle).add(disposable); - subscriptionList.put(lifecycle, eventClass); + return observable -> scheduler == null ? observable : subscribe ? observable.subscribeOn(scheduler) : observable.observeOn(scheduler); } @Override - public void subscribe(Class eventClass, @NonNull Object lifecycle, @NonNull Consumer action, int takeUntil) + public void subscribe(Class eventClass, @NonNull Object lifecycle, @NonNull Consumer action) + { + subscribe(eventClass, lifecycle, action, -1, EventScheduler.DEFAULT, EventScheduler.DEFAULT); + } + + @Override + public void subscribe(Class eventClass, @NonNull Object lifecycle, @NonNull Consumer action, int takeUtil) + { + subscribe(eventClass, lifecycle, action, takeUtil, EventScheduler.DEFAULT, EventScheduler.DEFAULT); + } + + @Override + // Subscribe on lifecycle (for example from plugin startUp -> shutdown) + public void subscribe(Class eventClass, @NonNull Object lifecycle, @NonNull Consumer action, int takeUntil, @Nullable EventScheduler subscribe, @Nullable EventScheduler observe) { if (subscriptionList.containsKey(lifecycle) && eventClass.equals(subscriptionList.get(lifecycle))) { @@ -82,10 +121,12 @@ public class EventBus implements EventBusInterface } Disposable disposable = getSubject(eventClass) + .compose(applyTake(takeUntil)) .filter(Objects::nonNull) // Filter out null objects, better safe than sorry .cast(eventClass) // Cast it for easier usage - .take(takeUntil) .doFinally(() -> unregister(lifecycle)) + .compose(applyScheduler(subscribe, true)) + .compose(applyScheduler(observe, false)) .subscribe(action, error -> { log.error("Exception in eventbus", error); diff --git a/runelite-client/src/main/java/net/runelite/client/eventbus/EventBusInterface.java b/runelite-client/src/main/java/net/runelite/client/eventbus/EventBusInterface.java index 5d6669c09e..9b6d66f51a 100644 --- a/runelite-client/src/main/java/net/runelite/client/eventbus/EventBusInterface.java +++ b/runelite-client/src/main/java/net/runelite/client/eventbus/EventBusInterface.java @@ -10,6 +10,8 @@ public interface EventBusInterface void subscribe(Class eventClass, @NonNull Object lifecycle, @NonNull Consumer action, int takeUntil); + void subscribe(Class eventClass, @NonNull Object lifecycle, @NonNull Consumer action, int takeUntil, EventScheduler subscribe, EventScheduler observe); + void unregister(@NonNull Object lifecycle); void post(Class eventClass, @NonNull T event); diff --git a/runelite-client/src/main/java/net/runelite/client/eventbus/EventScheduler.java b/runelite-client/src/main/java/net/runelite/client/eventbus/EventScheduler.java new file mode 100644 index 0000000000..7923cc17c8 --- /dev/null +++ b/runelite-client/src/main/java/net/runelite/client/eventbus/EventScheduler.java @@ -0,0 +1,21 @@ +package net.runelite.client.eventbus; + +import io.reactivex.annotations.Nullable; + +public enum EventScheduler +{ + DEFAULT(null), + COMPUTATION("computation"), + IO("io"), + NEWTHREAD("newThread"), + SINGLE("single"), + TRAMPOLINE("trampoline"), + CLIENT("client"); + + public final String scheduler; + + EventScheduler(@Nullable String scheduler) + { + this.scheduler = scheduler; + } +} diff --git a/runelite-client/src/main/java/net/runelite/client/eventbus/Subscribe.java b/runelite-client/src/main/java/net/runelite/client/eventbus/Subscribe.java index 26560ac6bf..cdecb670d8 100644 --- a/runelite-client/src/main/java/net/runelite/client/eventbus/Subscribe.java +++ b/runelite-client/src/main/java/net/runelite/client/eventbus/Subscribe.java @@ -36,4 +36,9 @@ import java.lang.annotation.Target; @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) @Documented -public @interface Subscribe {} +public @interface Subscribe +{ + int takeUntil() default -1; + EventScheduler subscribe() default EventScheduler.DEFAULT; + EventScheduler observe() default EventScheduler.DEFAULT; +} diff --git a/runelite-client/src/main/java/net/runelite/client/plugins/Plugin.java b/runelite-client/src/main/java/net/runelite/client/plugins/Plugin.java index 705637fe9f..6ff4642b8f 100644 --- a/runelite-client/src/main/java/net/runelite/client/plugins/Plugin.java +++ b/runelite-client/src/main/java/net/runelite/client/plugins/Plugin.java @@ -36,6 +36,7 @@ import lombok.Getter; import lombok.Value; import net.runelite.api.events.Event; import net.runelite.client.eventbus.EventBus; +import net.runelite.client.eventbus.EventScheduler; import net.runelite.client.eventbus.Subscribe; public abstract class Plugin implements Module @@ -62,7 +63,7 @@ public abstract class Plugin implements Module @SuppressWarnings("unchecked") final void addAnnotatedSubscriptions(EventBus eventBus) { - annotatedSubscriptions.forEach(sub -> eventBus.subscribe(sub.type, annotatedSubsLock, sub.method)); + annotatedSubscriptions.forEach(sub -> eventBus.subscribe(sub.type, annotatedSubsLock, sub.method, sub.takeUntil, sub.subscribe, sub.observe)); } final void removeAnnotatedSubscriptions(EventBus eventBus) @@ -76,8 +77,11 @@ public abstract class Plugin implements Module for (Method method : this.getClass().getDeclaredMethods()) { - if (method.getAnnotation(Subscribe.class) == null) + Subscribe annotation = method.getAnnotation(Subscribe.class); + if (annotation == null) + { continue; + } assert method.getParameterCount() == 1 : "Methods annotated with @Subscribe should have only one parameter"; @@ -88,7 +92,7 @@ public abstract class Plugin implements Module method.setAccessible(true); - Subscription sub = new Subscription(type.asSubclass(Event.class), event -> method.invoke(this, event)); + Subscription sub = new Subscription(type.asSubclass(Event.class), event -> method.invoke(this, event), annotation.takeUntil(), annotation.subscribe(), annotation.observe()); builder.add(sub); } @@ -101,5 +105,8 @@ public abstract class Plugin implements Module { private final Class type; private final Consumer method; + private final int takeUntil; + private final EventScheduler subscribe; + private final EventScheduler observe; } } diff --git a/runelite-client/src/main/java/net/runelite/client/plugins/defaultworld/DefaultWorldPlugin.java b/runelite-client/src/main/java/net/runelite/client/plugins/defaultworld/DefaultWorldPlugin.java index d10cd8dc20..c88e3e15eb 100644 --- a/runelite-client/src/main/java/net/runelite/client/plugins/defaultworld/DefaultWorldPlugin.java +++ b/runelite-client/src/main/java/net/runelite/client/plugins/defaultworld/DefaultWorldPlugin.java @@ -94,7 +94,7 @@ public class DefaultWorldPlugin extends Plugin applyWorld(); } - @Subscribe + @Subscribe(takeUntil = 2) private void onGameStateChanged(GameStateChanged event) { if (event.getGameState() == GameState.LOGGED_IN)