replace singletons

main
jbb01 10 months ago committed by jbb01
parent eadf1eaf5b
commit 0477142233

@ -14,7 +14,7 @@ import java.util.List;
public interface ChatBotSupportMXBean {
@SneakyThrows
static @NotNull ObjectName getObjectName(String name) {
static @NotNull ObjectName getObjectName(@NotNull String name) {
return ObjectName.getInstance(STR."eu.jonahbauer.chat.bot.server:component=ChatBots,name=\{quote(name)}");
}

@ -1,30 +1,32 @@
package eu.jonahbauer.chat.server;
import eu.jonahbauer.chat.server.bot.ChatBotSupervisor;
import eu.jonahbauer.chat.server.management.impl.ChatBotManager;
import eu.jonahbauer.chat.server.management.impl.SocketManager;
import eu.jonahbauer.chat.server.socket.SocketSupervisor;
import eu.jonahbauer.chat.server.util.Lazy.MutableLazy;
import javax.management.JMException;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class Main {
public static void main(String[] args) throws IOException, InterruptedException {
// initialize ChatBotManager and SocketManager
var _ = ChatBotManager.INSTANCE;
var _ = SocketManager.INSTANCE;
public static void main(String[] args) throws IOException, InterruptedException, JMException {
var config = Config.load();
ChatBotSupervisor.INSTANCE.start(config);
SocketSupervisor.INSTANCE.start(config);
var chatBotSupervisorLazy = new MutableLazy<ChatBotSupervisor>();
var socketSupervisorLazy = new MutableLazy<SocketSupervisor>();
try (
var chatBotSupervisor = chatBotSupervisorLazy.set(new ChatBotSupervisor(socketSupervisorLazy));
var socketSupervisor = socketSupervisorLazy.set(new SocketSupervisor(chatBotSupervisorLazy))
) {
chatBotSupervisor.start(config);
socketSupervisor.start(config);
try {
// keep main thread running
new CountDownLatch(1).await();
} catch (InterruptedException e) {
SocketSupervisor.INSTANCE.stop();
ChatBotSupervisor.INSTANCE.stop();
try {
// keep main thread running
new CountDownLatch(1).await();
} catch (InterruptedException _) {
// ignore
}
}
}
}

@ -6,32 +6,43 @@ import eu.jonahbauer.chat.bot.config.BotConfig;
import eu.jonahbauer.chat.bot.impl.BotCreationException;
import eu.jonahbauer.chat.bot.impl.ChatBotFactory;
import eu.jonahbauer.chat.server.Config;
import eu.jonahbauer.chat.server.management.impl.ChatBotManager;
import eu.jonahbauer.chat.server.management.impl.ChatBotSupport;
import eu.jonahbauer.chat.server.socket.SocketSupervisor;
import eu.jonahbauer.chat.server.util.Lazy;
import lombok.extern.log4j.Log4j2;
import org.jetbrains.annotations.Blocking;
import org.jetbrains.annotations.Contract;
import org.jetbrains.annotations.NonBlocking;
import org.jetbrains.annotations.NotNull;
import javax.management.JMException;
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
@Log4j2
public enum ChatBotSupervisor {
INSTANCE;
public final class ChatBotSupervisor implements AutoCloseable {
private static final Duration POST_EXPIRATION = Duration.ofSeconds(2);
private static final int MESSAGE_QUEUE_SIZE = 10;
/**
* Map of running bots indexed by their name.
*/
private final @NotNull Map<@NotNull String, @NotNull RunningBot> bots = new HashMap<>();
private final @NotNull ConcurrentMap<@NotNull String, @NotNull RunningBot> bots = new ConcurrentHashMap<>();
private final @NotNull ReentrantLock lock = new ReentrantLock();
private final @NotNull Lazy<SocketSupervisor> socketSupervisor;
public ChatBotSupervisor(@NotNull Lazy<SocketSupervisor> socketSupervisor) throws JMException {
ChatBotManager.init(this);
this.socketSupervisor = socketSupervisor;
}
/**
* Dispatches the message to all running bots. If a bot has too many messages queued up, the message may not be
@ -52,9 +63,16 @@ public enum ChatBotSupervisor {
* @param config a configuration
* @throws IllegalStateException if any bots are running already
*/
public synchronized void start(@NotNull Config config) {
if (!bots.isEmpty()) throw new IllegalStateException("start(Config) may not be used when any bots are running already");
config.bots().forEach(this::start);
public void start(@NotNull Config config) {
lock.lock();
try {
if (!bots.isEmpty()) {
throw new IllegalStateException("start(Config) may not be used when any bots are running already");
}
config.bots().forEach(this::start);
} finally {
lock.unlock();
}
}
/**
@ -64,7 +82,7 @@ public enum ChatBotSupervisor {
* @param channels the channels the bot will be active in
* @throws BotCreationException if the bot could not be created
*/
public synchronized void start(@NotNull String name, @NotNull String type, @NotNull List<String> channels) {
public void start(@NotNull String name, @NotNull String type, @NotNull List<String> channels) {
start(name, BotConfig.builder().type(type).channels(channels).build());
}
@ -74,9 +92,16 @@ public enum ChatBotSupervisor {
* @param config the bot configuration
* @throws BotCreationException if the bot could not be created
*/
public synchronized void start(@NotNull String name, @NotNull BotConfig config) {
if (bots.containsKey(name)) throw new BotCreationException("Duplicate bot name: " + name);
bots.put(name, new RunningBot(name, config));
public void start(@NotNull String name, @NotNull BotConfig config) {
lock.lock();
try {
if (bots.containsKey(name)) {
throw new BotCreationException("Duplicate bot name: " + name);
}
bots.put(name, new RunningBot(name, config));
} finally {
lock.unlock();
}
}
/**
@ -87,10 +112,13 @@ public enum ChatBotSupervisor {
public void stop(@NotNull String name) throws InterruptedException {
RunningBot bot;
synchronized (this) {
lock.lock();
try {
bot = bots.remove(name);
if (bot == null) return;
bot.stop();
} finally {
lock.unlock();
}
bot.join();
@ -103,12 +131,15 @@ public enum ChatBotSupervisor {
public void stop() throws InterruptedException {
var stopped = new ArrayList<RunningBot>();
synchronized (this) {
lock.lock();
try {
for (RunningBot bot : bots.values()) {
stopped.add(bot);
bot.stop();
}
bots.clear();
} finally {
lock.unlock();
}
for (var bot : stopped) {
@ -116,6 +147,11 @@ public enum ChatBotSupervisor {
}
}
@Override
public void close() throws InterruptedException {
stop();
}
/**
* {@return a map of all currently running bots and their effective config}
*/
@ -144,10 +180,10 @@ public enum ChatBotSupervisor {
/**
* Manages a {@link ChatBot} instance running on its own thread. Takes care of recreating the bot after it threw
* an exception. {@linkplain ChatBotSupport#register(String, BotConfig) Registers} the bot during construction
* and {@linkplain ChatBotSupport#unregister(String) unregisters} it during {@link #stop()}.
* an exception. {@linkplain ChatBotSupport#register(ChatBotSupervisor, String, BotConfig) Registers} the bot
* during construction and {@linkplain ChatBotSupport#unregister(String) unregisters} it during {@link #stop()}.
*/
private static class RunningBot implements Runnable {
private class RunningBot implements Runnable {
private final @NotNull String name;
private final @NotNull BotConfig config;
private final @NotNull ChatBotFactory<?> factory;
@ -163,7 +199,7 @@ public enum ChatBotSupervisor {
this.factory = getChatBotFactory(config.getType());
log.info("starting bot {}...", name);
ChatBotSupport.register(name, config);
ChatBotSupport.register(ChatBotSupervisor.this, name, config);
this.thread = Thread.ofVirtual().name("ChatBot[" + name + "]").start(this);
}
@ -200,7 +236,7 @@ public enum ChatBotSupervisor {
} else if (!bot.getConfig().getChannels().contains(post.post().channel())) {
log.debug("Ignoring message {} because channel is not listened to.", post.post());
} else {
bot.onMessage(SocketSupervisor.INSTANCE, post.post());
bot.onMessage(ChatBotSupervisor.this.socketSupervisor.get(), post.post());
}
} catch (InterruptedException _) {
}

@ -5,6 +5,8 @@ import eu.jonahbauer.chat.server.management.annotations.ManagedBean;
import eu.jonahbauer.chat.server.management.annotations.ManagedOperation;
import eu.jonahbauer.chat.server.management.annotations.ManagedParameter;
import lombok.extern.log4j.Log4j2;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import javax.management.*;
import java.lang.reflect.Method;
@ -15,13 +17,13 @@ import java.util.Locale;
import java.util.function.Function;
@Log4j2
public class AdvancedMBean extends StandardMBean {
public <T> AdvancedMBean(T implementation) {
class AdvancedMBean extends StandardMBean {
public AdvancedMBean(@NotNull Object implementation) {
super(implementation, null, true);
}
@Override
protected String getDescription(MBeanInfo info) {
protected @Nullable String getDescription(@Nullable MBeanInfo info) {
var clazz = getMBeanInterface();
var annotation = clazz.getAnnotation(ManagedBean.class);
if (annotation != null && !"".equals(annotation.description())) {
@ -32,9 +34,7 @@ public class AdvancedMBean extends StandardMBean {
}
@Override
protected int getImpact(MBeanOperationInfo info) {
if (info == null) return MBeanOperationInfo.UNKNOWN;
protected int getImpact(@Nullable MBeanOperationInfo info) {
var method = findOperation(info);
var annotation = method != null ? method.getAnnotation(ManagedOperation.class) : null;
if (annotation != null) {
@ -45,9 +45,7 @@ public class AdvancedMBean extends StandardMBean {
}
@Override
protected String getDescription(MBeanOperationInfo info) {
if (info == null) return null;
protected @Nullable String getDescription(@Nullable MBeanOperationInfo info) {
var method = findOperation(info);
var annotation = method != null ? method.getAnnotation(ManagedOperation.class) : null;
if (annotation != null && !"".equals(annotation.description())) {
@ -58,13 +56,9 @@ public class AdvancedMBean extends StandardMBean {
}
@Override
protected String getParameterName(MBeanOperationInfo op, MBeanParameterInfo param, int sequence) {
if (op == null) return super.getParameterName(op, param, sequence);
protected @Nullable String getParameterName(@Nullable MBeanOperationInfo op, @Nullable MBeanParameterInfo param, int sequence) {
var method = findOperation(op);
if (method == null) return super.getParameterName(op, param, sequence);
var annotation = method.getParameters()[sequence].getAnnotation(ManagedParameter.class);
var annotation = method != null ? method.getParameters()[sequence].getAnnotation(ManagedParameter.class) : null;
if (annotation != null && !"".equals(annotation.name())) {
return annotation.name();
} else {
@ -73,13 +67,9 @@ public class AdvancedMBean extends StandardMBean {
}
@Override
protected String getDescription(MBeanOperationInfo op, MBeanParameterInfo param, int sequence) {
if (op == null) return super.getParameterName(op, param, sequence);
protected @Nullable String getDescription(@Nullable MBeanOperationInfo op, @Nullable MBeanParameterInfo param, int sequence) {
var method = findOperation(op);
if (method == null) return super.getParameterName(op, param, sequence);
var annotation = method.getParameters()[sequence].getAnnotation(ManagedParameter.class);
var annotation = method != null ? method.getParameters()[sequence].getAnnotation(ManagedParameter.class) : null;
if (annotation != null && !"".equals(annotation.description())) {
return annotation.description();
} else {
@ -88,7 +78,9 @@ public class AdvancedMBean extends StandardMBean {
}
@Override
protected String getDescription(MBeanAttributeInfo info) {
protected @Nullable String getDescription(@Nullable MBeanAttributeInfo info) {
if (info == null) return super.getDescription(info);
var name = info.getName();
var type = getType(info, MBeanAttributeInfo::getType);
@ -107,7 +99,7 @@ public class AdvancedMBean extends StandardMBean {
return super.getDescription(info);
}
private <T extends MBeanFeatureInfo> String getType(T info, Function<T, String> getType) {
private <T extends MBeanFeatureInfo> String getType(@NotNull T info, @NotNull Function<T, String> getType) {
var descriptor = info.getDescriptor();
if (descriptor == null) return getType.apply(info);
@ -119,7 +111,9 @@ public class AdvancedMBean extends StandardMBean {
}
}
private Method findOperation(MBeanOperationInfo info) {
private @Nullable Method findOperation(@Nullable MBeanOperationInfo info) {
if (info == null) return null;
var name = info.getName();
var params = new ArrayList<String>();
@ -130,11 +124,11 @@ public class AdvancedMBean extends StandardMBean {
return findMethod(name, params);
}
private Method findMethod(String name, String...parameters) {
private @Nullable Method findMethod(@NotNull String name, @NotNull String @NotNull... parameters) {
return findMethod(name, Arrays.asList(parameters));
}
private Method findMethod(String name, List<String> parameters) {
private @Nullable Method findMethod(@NotNull String name, @NotNull List<@NotNull String> parameters) {
var clazz = getMBeanInterface();
methods: for (var method : clazz.getMethods()) {
if (!method.getName().equals(name)) continue;
@ -150,7 +144,7 @@ public class AdvancedMBean extends StandardMBean {
return null;
}
private static String capitalize(String name) {
private static @NotNull String capitalize(@NotNull String name) {
if (name.isEmpty()) return name;
return name.substring(0, 1).toUpperCase(Locale.ROOT) + name.substring(1);
}

@ -5,26 +5,31 @@ import eu.jonahbauer.chat.bot.impl.ChatBotFactory;
import eu.jonahbauer.chat.server.bot.ChatBotSupervisor;
import eu.jonahbauer.chat.server.management.BotConfigSupport;
import eu.jonahbauer.chat.server.management.ChatBotManagerMXBean;
import lombok.SneakyThrows;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Unmodifiable;
import javax.management.JMException;
import java.lang.management.ManagementFactory;
import java.util.*;
import java.util.stream.Collectors;
public enum ChatBotManager implements ChatBotManagerMXBean {
INSTANCE;
public final class ChatBotManager implements ChatBotManagerMXBean {
private final @NotNull ChatBotSupervisor supervisor;
@SneakyThrows
ChatBotManager() {
public static void init(@NotNull ChatBotSupervisor supervisor) throws JMException {
var impl = new ChatBotManager(supervisor);
var server = ManagementFactory.getPlatformMBeanServer();
server.registerMBean(new AdvancedMBean(this), ChatBotManagerMXBean.NAME);
server.registerMBean(new AdvancedMBean(impl), ChatBotManagerMXBean.NAME);
}
private ChatBotManager(@NotNull ChatBotSupervisor supervisor) {
this.supervisor = Objects.requireNonNull(supervisor);
}
@Override
public void start(@NotNull String name, @NotNull String type, @NotNull List<@NotNull String> channels) {
try {
ChatBotSupervisor.INSTANCE.start(name, type, channels);
supervisor.start(name, type, channels);
} catch (BotCreationException ex) {
throw new IllegalArgumentException(ex.getMessage(), ex.getCause());
}
@ -33,7 +38,7 @@ public enum ChatBotManager implements ChatBotManagerMXBean {
@Override
public void start(@NotNull String name, @NotNull BotConfigSupport config) {
try {
ChatBotSupervisor.INSTANCE.start(name, config.unwrap());
supervisor.start(name, config.unwrap());
} catch (BotCreationException ex) {
throw new IllegalArgumentException(ex.getMessage(), ex.getCause());
}
@ -41,23 +46,23 @@ public enum ChatBotManager implements ChatBotManagerMXBean {
@Override
public void stop(@NotNull String name) throws InterruptedException {
ChatBotSupervisor.INSTANCE.stop(name);
supervisor.stop(name);
}
@Override
public void stop() throws InterruptedException {
ChatBotSupervisor.INSTANCE.stop();
supervisor.stop();
}
@Override
public @NotNull Map<@NotNull String, @NotNull BotConfigSupport> getBots() {
public @Unmodifiable @NotNull Map<@NotNull String, @NotNull BotConfigSupport> getBots() {
var out = new TreeMap<String, BotConfigSupport>();
ChatBotSupervisor.INSTANCE.getBots().forEach((key, value) -> out.put(key, new BotConfigSupport(value)));
supervisor.getBots().forEach((key, value) -> out.put(key, new BotConfigSupport(value)));
return Collections.unmodifiableMap(out);
}
@Override
public @NotNull SortedSet<@NotNull String> getBotImplementations() {
public @Unmodifiable @NotNull SortedSet<@NotNull String> getBotImplementations() {
return Collections.unmodifiableSortedSet(
ChatBotFactory.implementations().stream()
.map(Class::getCanonicalName)

@ -6,25 +6,29 @@ import eu.jonahbauer.chat.server.management.BotConfigSupport;
import eu.jonahbauer.chat.server.management.ChatBotSupportMXBean;
import lombok.Getter;
import lombok.extern.log4j.Log4j2;
import org.jetbrains.annotations.NotNull;
import java.lang.management.ManagementFactory;
import java.util.Objects;
@Getter
@Log4j2
public class ChatBotSupport implements ChatBotSupportMXBean {
private final String name;
private final BotConfigSupport config;
public final class ChatBotSupport implements ChatBotSupportMXBean {
private final @NotNull ChatBotSupervisor supervisor;
private final @NotNull String name;
private final @NotNull BotConfigSupport config;
public static void register(String name, BotConfig config) {
public static void register(@NotNull ChatBotSupervisor supervisor, @NotNull String name, @NotNull BotConfig config) {
try {
var impl = new ChatBotSupport(supervisor, name, config);
var server = ManagementFactory.getPlatformMBeanServer();
server.registerMBean(new AdvancedMBean(new ChatBotSupport(name, config)), ChatBotSupportMXBean.getObjectName(name));
server.registerMBean(new AdvancedMBean(impl), ChatBotSupportMXBean.getObjectName(name));
} catch (Exception ex) {
log.error("Could not register bot as an MBean.", ex);
}
}
public static void unregister(String name) {
public static void unregister(@NotNull String name) {
try {
var server = ManagementFactory.getPlatformMBeanServer();
server.unregisterMBean(ChatBotSupportMXBean.getObjectName(name));
@ -33,12 +37,13 @@ public class ChatBotSupport implements ChatBotSupportMXBean {
}
}
private ChatBotSupport(String name, BotConfig config) {
this.name = name;
private ChatBotSupport(@NotNull ChatBotSupervisor supervisor, @NotNull String name, @NotNull BotConfig config) {
this.supervisor = Objects.requireNonNull(supervisor);
this.name = Objects.requireNonNull(name);
this.config = new BotConfigSupport(config);
}
public void stop() throws InterruptedException {
ChatBotSupervisor.INSTANCE.stop(name);
supervisor.stop(name);
}
}

@ -2,43 +2,48 @@ package eu.jonahbauer.chat.server.management.impl;
import eu.jonahbauer.chat.server.management.SocketManagerMXBean;
import eu.jonahbauer.chat.server.socket.SocketSupervisor;
import lombok.SneakyThrows;
import org.jetbrains.annotations.NotNull;
import javax.management.JMException;
import java.lang.management.ManagementFactory;
import java.util.Objects;
import java.util.SortedSet;
public enum SocketManager implements SocketManagerMXBean {
INSTANCE;
public final class SocketManager implements SocketManagerMXBean {
private final @NotNull SocketSupervisor supervisor;
@SneakyThrows
SocketManager() {
public static void init(@NotNull SocketSupervisor supervisor) throws JMException {
var impl = new SocketManager(supervisor);
var server = ManagementFactory.getPlatformMBeanServer();
server.registerMBean(new AdvancedMBean(this), SocketManagerMXBean.NAME);
server.registerMBean(new AdvancedMBean(impl), SocketManagerMXBean.NAME);
}
private SocketManager(@NotNull SocketSupervisor supervisor) {
this.supervisor = Objects.requireNonNull(supervisor);
}
@Override
public void setCredentials(@NotNull String username, @NotNull String password) {
SocketSupervisor.INSTANCE.setAccount(username, password);
supervisor.setAccount(username, password);
}
@Override
public void start(@NotNull String channel) {
SocketSupervisor.INSTANCE.start(channel);
supervisor.start(channel);
}
@Override
public void stop(@NotNull String channel) throws InterruptedException {
SocketSupervisor.INSTANCE.stop(channel);
supervisor.stop(channel);
}
@Override
public void stop() throws InterruptedException {
SocketSupervisor.INSTANCE.stop();
supervisor.stop();
}
@Override
public @NotNull SortedSet<@NotNull String> getChannels() {
return SocketSupervisor.INSTANCE.getChannels();
return supervisor.getChannels();
}
}

@ -3,30 +3,32 @@ package eu.jonahbauer.chat.server.management.impl;
import eu.jonahbauer.chat.server.management.SocketState;
import eu.jonahbauer.chat.server.management.SocketSupportMXBean;
import eu.jonahbauer.chat.server.socket.SocketSupervisor;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.lang.management.ManagementFactory;
import java.util.Date;
import java.util.Objects;
@Log4j2
@Getter
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public class SocketSupport implements SocketSupportMXBean {
private final String channel;
public final class SocketSupport implements SocketSupportMXBean {
private final @NotNull SocketSupervisor supervisor;
private final @NotNull String channel;
public static void register(String channel) {
public static void register(@NotNull SocketSupervisor supervisor, @NotNull String channel) {
try {
var impl = new SocketSupport(supervisor, channel);
var server = ManagementFactory.getPlatformMBeanServer();
server.registerMBean(new AdvancedMBean(new SocketSupport(channel)), SocketSupportMXBean.getObjectName(channel));
server.registerMBean(new AdvancedMBean(impl), SocketSupportMXBean.getObjectName(channel));
} catch (Exception ex) {
log.error("Could not register socket as an MBean.", ex);
}
}
public static void unregister(String name) {
public static void unregister(@NotNull String name) {
try {
var server = ManagementFactory.getPlatformMBeanServer();
server.unregisterMBean(SocketSupportMXBean.getObjectName(name));
@ -35,30 +37,35 @@ public class SocketSupport implements SocketSupportMXBean {
}
}
private SocketSupport(@NotNull SocketSupervisor supervisor, @NotNull String channel) {
this.supervisor = Objects.requireNonNull(supervisor);
this.channel = Objects.requireNonNull(channel);
}
@Override
public Date getCooldownUntil() {
var cooldown = SocketSupervisor.INSTANCE.getCooldownUntil(channel);
public @Nullable Date getCooldownUntil() {
var cooldown = supervisor.getCooldownUntil(channel);
return cooldown != null ? Date.from(cooldown) : null;
}
@Override
public SocketState getState() {
return SocketSupervisor.INSTANCE.getState(channel);
public @Nullable SocketState getState() {
return supervisor.getState(channel);
}
@Override
public void stop() throws InterruptedException {
SocketSupervisor.INSTANCE.stop(channel);
supervisor.stop(channel);
}
@Override
public void restart() {
SocketSupervisor.INSTANCE.restart(channel);
supervisor.restart(channel);
}
@Override
public void send(String name, String message, boolean bottag, boolean publicid) {
SocketSupervisor.INSTANCE.send(channel, name, message, bottag, publicid);
public void send(@NotNull String name, @NotNull String message, boolean bottag, boolean publicid) {
supervisor.send(channel, name, message, bottag, publicid);
}
}

@ -8,13 +8,16 @@ import eu.jonahbauer.chat.bot.api.Message;
import eu.jonahbauer.chat.server.Config;
import eu.jonahbauer.chat.server.bot.ChatBotSupervisor;
import eu.jonahbauer.chat.server.management.SocketState;
import eu.jonahbauer.chat.server.management.impl.SocketManager;
import eu.jonahbauer.chat.server.management.impl.SocketSupport;
import eu.jonahbauer.chat.server.util.Lazy;
import lombok.SneakyThrows;
import lombok.extern.log4j.Log4j2;
import org.apache.logging.log4j.Level;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import javax.management.JMException;
import java.io.IOException;
import java.net.CookieManager;
import java.net.URI;
@ -32,9 +35,7 @@ import java.util.concurrent.locks.ReentrantLock;
import static eu.jonahbauer.chat.server.util.UrlTemplateProcessor.URL;
@Log4j2
public enum SocketSupervisor implements Chat {
INSTANCE;
public final class SocketSupervisor implements Chat, AutoCloseable {
private static final URI AUTH_SERVER = URI.create("https://chat.qed-verein.de/rubychat/account");
private static final String ORIGIN = "https://chat.qed-verein.de";
private static final String SERVER = "wss://chat.qed-verein.de/websocket?position=0&version=2&channel=";
@ -49,6 +50,13 @@ public enum SocketSupervisor implements Chat {
private final @NotNull CookieManager cookie = new CookieManager();
private final @NotNull HttpClient client = HttpClient.newBuilder().cookieHandler(cookie).build();
private final @NotNull ConcurrentMap<@NotNull String, @NotNull ChatClient> sockets = new ConcurrentHashMap<>();
private final @NotNull Lazy<ChatBotSupervisor> chatBotSupervisor;
private final @NotNull ReentrantLock lock = new ReentrantLock();
public SocketSupervisor(@NotNull Lazy<ChatBotSupervisor> chatBotSupervisor) throws JMException {
SocketManager.init(this);
this.chatBotSupervisor = chatBotSupervisor;
}
public void setAccount(@NotNull Config.Account account) {
this.account = Objects.requireNonNull(account, "account");
@ -82,11 +90,18 @@ public enum SocketSupervisor implements Chat {
* @param config a configuration
* @throws IllegalStateException if any sockets are already running
*/
public synchronized void start(@NotNull Config config) {
if (!this.sockets.isEmpty()) throw new IllegalStateException("start(Config) may not be used when any sockets are running already");
public void start(@NotNull Config config) {
lock.lock();
try {
if (!this.sockets.isEmpty()) {
throw new IllegalStateException("start(Config) may not be used when any sockets are running already");
}
setAccount(config.account());
config.channels().forEach(this::start);
setAccount(config.account());
config.channels().forEach(this::start);
} finally {
lock.unlock();
}
}
/**
@ -94,11 +109,18 @@ public enum SocketSupervisor implements Chat {
* @param channel the channel
* @throws IllegalStateException if a socket is already connected to that channel
*/
public synchronized void start(@NotNull String channel) {
if (sockets.containsKey(channel)) throw new SocketCreationException("Duplicate channel: " + channel);
public void start(@NotNull String channel) {
lock.lock();
try {
if (sockets.containsKey(channel)) {
throw new SocketCreationException("Duplicate channel: " + channel);
}
var socket = new ChatClient(channel);
this.sockets.put(channel, socket);
var socket = new ChatClient(channel);
this.sockets.put(channel, socket);
} finally {
lock.unlock();
}
}
/**
@ -151,6 +173,11 @@ public enum SocketSupervisor implements Chat {
}
}
@Override
public void close() throws InterruptedException {
stop();
}
public @NotNull SortedSet<@NotNull String> getChannels() {
return Collections.unmodifiableSortedSet(new TreeSet<>(sockets.keySet()));
}
@ -191,7 +218,7 @@ public enum SocketSupervisor implements Chat {
return credentials;
}
private record Credentials(String userid, String pwhash) {}
private record Credentials(@NotNull String userid, @NotNull String pwhash) {}
private sealed interface ChatClientState {
@ -218,7 +245,7 @@ public enum SocketSupervisor implements Chat {
}
private final class ChatClient implements WebSocket.Listener, ChatClientState {
private final String channel;
private final @NotNull String channel;
private final @NotNull ReentrantLock lock = new ReentrantLock();
private final @NotNull CountDownLatch stopped = new CountDownLatch(1);
@ -227,7 +254,7 @@ public enum SocketSupervisor implements Chat {
public ChatClient(@NotNull String channel) {
this.channel = channel;
SocketSupport.register(channel);
SocketSupport.register(SocketSupervisor.this, channel);
this.state.onEnter();
}
@ -427,7 +454,7 @@ public enum SocketSupervisor implements Chat {
if (message instanceof Message.Post post) {
delay = post.id() + 1;
ChatBotSupervisor.INSTANCE.onMessage(post);
SocketSupervisor.this.chatBotSupervisor.get().onMessage(post);
}
} catch (JsonProcessingException e) {
log.warn("Could not parse as message: {}", text, e);

@ -0,0 +1,26 @@
package eu.jonahbauer.chat.server.util;
import org.jetbrains.annotations.NotNull;
import java.util.Objects;
public interface Lazy<T> {
@NotNull T get();
class MutableLazy<T> implements Lazy<T> {
private T value;
@Override
public @NotNull T get() {
var value = this.value;
if (value == null) throw new IllegalStateException();
return value;
}
public @NotNull T set(@NotNull T value) {
if (this.value != null) throw new IllegalStateException();
this.value = Objects.requireNonNull(value);
return value;
}
}
}
Loading…
Cancel
Save