initial commit

This commit is contained in:
jbb01
2023-09-12 19:47:07 +02:00
commit eadf1eaf5b
57 changed files with 3407 additions and 0 deletions

26
server/build.gradle.kts Normal file
View File

@@ -0,0 +1,26 @@
plugins {
id("chat-bot.java-conventions")
application
}
group = "eu.jonahbauer.chat"
version = "0.1.0-SNAPSHOT"
val bots = project(":bots").subprojects
dependencies {
implementation(project(":bot-api"))
implementation(project(":management"))
implementation(libs.bundles.log4j2)
implementation(libs.bundles.jackson)
bots.forEach {
implementation(project(it.path))
}
}
application {
mainClass.set("eu.jonahbauer.chat.server.Main")
mainModule.set("eu.jonahbauer.chat.server")
applicationDefaultJvmArgs = listOf("--enable-preview")
}

View File

@@ -0,0 +1,60 @@
package eu.jonahbauer.chat.server;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import eu.jonahbauer.chat.bot.config.BotConfig;
import eu.jonahbauer.chat.server.bot.BotConfigDeserializer;
import org.jetbrains.annotations.NotNull;
import java.io.IOException;
import java.net.URI;
import java.net.URL;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
public record Config(
@NotNull Account account,
@NotNull Set<@NotNull String> channels,
@JsonDeserialize(contentUsing = BotConfigDeserializer.class)
@NotNull Map<@NotNull String, @NotNull BotConfig> bots
) {
private static final String CONFIGURATION_FILE_ENV = "CHAT_BOT_CONFIG";
private static final String CONFIGURATION_FILE_PROPERTY = "chatbot.configurationFile";
private static final String DEFAULT_CONFIGURATION_FILE = "file:./config.json";
public static @NotNull Config load() throws IOException {
var env = System.getenv(CONFIGURATION_FILE_ENV);
if (env != null) return read(env);
var prop = System.getProperty(CONFIGURATION_FILE_PROPERTY);
if (prop != null) return read(prop);
return read(DEFAULT_CONFIGURATION_FILE);
}
public static @NotNull Config read(@NotNull String url) throws IOException {
return read(URI.create(url).toURL());
}
public static @NotNull Config read(@NotNull URL url) throws IOException {
var mapper = new ObjectMapper();
return mapper.readValue(url, Config.class);
}
public Config {
Objects.requireNonNull(account, "account");
channels = Set.copyOf(channels); // implicit null check
bots = Map.copyOf(bots); // implicit null check
}
public record Account(
@NotNull String username,
@NotNull String password
) {
public Account {
Objects.requireNonNull(username, "username");
Objects.requireNonNull(password, "password");
}
}
}

View File

@@ -0,0 +1,30 @@
package eu.jonahbauer.chat.server;
import eu.jonahbauer.chat.server.bot.ChatBotSupervisor;
import eu.jonahbauer.chat.server.management.impl.ChatBotManager;
import eu.jonahbauer.chat.server.management.impl.SocketManager;
import eu.jonahbauer.chat.server.socket.SocketSupervisor;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class Main {
public static void main(String[] args) throws IOException, InterruptedException {
// initialize ChatBotManager and SocketManager
var _ = ChatBotManager.INSTANCE;
var _ = SocketManager.INSTANCE;
var config = Config.load();
ChatBotSupervisor.INSTANCE.start(config);
SocketSupervisor.INSTANCE.start(config);
try {
// keep main thread running
new CountDownLatch(1).await();
} catch (InterruptedException e) {
SocketSupervisor.INSTANCE.stop();
ChatBotSupervisor.INSTANCE.stop();
}
}
}

View File

@@ -0,0 +1,61 @@
package eu.jonahbauer.chat.server.bot;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import eu.jonahbauer.chat.bot.config.BotConfig;
import eu.jonahbauer.chat.bot.config.BotConfigurationException;
import java.io.IOException;
import java.util.ArrayList;
public class BotConfigDeserializer extends StdDeserializer<BotConfig> {
protected BotConfigDeserializer() {
super(BotConfig.class);
}
@Override
public BotConfig deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
if (p.currentToken() == JsonToken.START_OBJECT) {
p.nextToken();
}
var builder = BotConfig.builder();
while (p.currentToken() == JsonToken.FIELD_NAME) {
var key = p.currentName();
switch (p.nextToken()) {
case VALUE_STRING -> builder.value(key, p.getValueAsString());
case VALUE_FALSE -> builder.value(key, false);
case VALUE_TRUE -> builder.value(key, true);
case VALUE_NULL -> {}
case VALUE_NUMBER_FLOAT -> builder.value(key, p.getDoubleValue());
case VALUE_NUMBER_INT -> builder.value(key, p.getLongValue());
case START_ARRAY -> {
var list = new ArrayList<String>();
while (p.nextToken() != JsonToken.END_ARRAY) {
if (p.currentToken() == JsonToken.VALUE_STRING) {
list.add(p.getValueAsString());
} else {
throw new BotConfigurationException("Unsupported property type in array.");
}
}
builder.value(key, list);
}
case START_OBJECT -> throw new BotConfigurationException("Unsupported property type: object");
default -> throw new BotConfigurationException("Unsupported property property type.");
}
p.nextToken();
}
if (p.currentToken() == JsonToken.END_OBJECT) {
return builder.build();
}
throw new JsonParseException(p, "Unexpected token: " + p.currentToken(), p.currentLocation());
}
}

View File

@@ -0,0 +1,242 @@
package eu.jonahbauer.chat.server.bot;
import eu.jonahbauer.chat.bot.api.ChatBot;
import eu.jonahbauer.chat.bot.api.Message;
import eu.jonahbauer.chat.bot.config.BotConfig;
import eu.jonahbauer.chat.bot.impl.BotCreationException;
import eu.jonahbauer.chat.bot.impl.ChatBotFactory;
import eu.jonahbauer.chat.server.Config;
import eu.jonahbauer.chat.server.management.impl.ChatBotSupport;
import eu.jonahbauer.chat.server.socket.SocketSupervisor;
import lombok.extern.log4j.Log4j2;
import org.jetbrains.annotations.Blocking;
import org.jetbrains.annotations.Contract;
import org.jetbrains.annotations.NonBlocking;
import org.jetbrains.annotations.NotNull;
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@Log4j2
public enum ChatBotSupervisor {
INSTANCE;
private static final Duration POST_EXPIRATION = Duration.ofSeconds(2);
private static final int MESSAGE_QUEUE_SIZE = 10;
/**
* Map of running bots indexed by their name.
*/
private final @NotNull Map<@NotNull String, @NotNull RunningBot> bots = new HashMap<>();
/**
* Dispatches the message to all running bots. If a bot has too many messages queued up, the message may not be
* delivered to that bot.
* @param message the message
*/
public void onMessage(@NotNull Message.Post message) {
log.debug("Dispatching message: {}", message);
var expiration = Instant.now().plus(POST_EXPIRATION);
for (var entry : bots.entrySet()) {
var result = entry.getValue().offer(message, expiration);
if (!result) log.warn("A message was dropped because bot {} is busy.", entry.getKey());
}
}
/**
* Starts all bots defined in the given config.
* @param config a configuration
* @throws IllegalStateException if any bots are running already
*/
public synchronized void start(@NotNull Config config) {
if (!bots.isEmpty()) throw new IllegalStateException("start(Config) may not be used when any bots are running already");
config.bots().forEach(this::start);
}
/**
* Starts a bot with the given name and type.
* @param name the name of the bot (must be unique)
* @param type the fully qualified name of a class extending {@link ChatBot}
* @param channels the channels the bot will be active in
* @throws BotCreationException if the bot could not be created
*/
public synchronized void start(@NotNull String name, @NotNull String type, @NotNull List<String> channels) {
start(name, BotConfig.builder().type(type).channels(channels).build());
}
/**
* Starts a bot with the given config under the specified name.
* @param name the name of the bot (must be unique)
* @param config the bot configuration
* @throws BotCreationException if the bot could not be created
*/
public synchronized void start(@NotNull String name, @NotNull BotConfig config) {
if (bots.containsKey(name)) throw new BotCreationException("Duplicate bot name: " + name);
bots.put(name, new RunningBot(name, config));
}
/**
* Stops the bot with the given name and waits for it to finish. Does nothing if no bot with that name exists.
* @param name the name of the bot
* @throws InterruptedException if any thread has interrupted the current thread.
*/
public void stop(@NotNull String name) throws InterruptedException {
RunningBot bot;
synchronized (this) {
bot = bots.remove(name);
if (bot == null) return;
bot.stop();
}
bot.join();
}
/**
* Stops all currently running bots and waits for them to finish.
* @throws InterruptedException if any thread has interrupted the current thread.
*/
public void stop() throws InterruptedException {
var stopped = new ArrayList<RunningBot>();
synchronized (this) {
for (RunningBot bot : bots.values()) {
stopped.add(bot);
bot.stop();
}
bots.clear();
}
for (var bot : stopped) {
bot.join();
}
}
/**
* {@return a map of all currently running bots and their effective config}
*/
public @NotNull Map<@NotNull String, @NotNull BotConfig> getBots() {
var out = new TreeMap<String, BotConfig>();
for (var entry : bots.entrySet()) {
out.put(entry.getKey(), entry.getValue().getConfig());
}
return Collections.unmodifiableMap(out);
}
@Contract("_ -> new")
@SuppressWarnings("unchecked")
private static @NotNull ChatBotFactory<?> getChatBotFactory(@NotNull String type) {
try {
var clazz = Class.forName(type);
if (!ChatBot.class.isAssignableFrom(clazz)) {
throw new BotCreationException("Not a chat bot type: " + type);
}
return new ChatBotFactory<>((Class<? extends ChatBot>) clazz);
} catch (ClassNotFoundException ex) {
throw new BotCreationException("Unknown chat bot of type: " + type, ex);
}
}
/**
* Manages a {@link ChatBot} instance running on its own thread. Takes care of recreating the bot after it threw
* an exception. {@linkplain ChatBotSupport#register(String, BotConfig) Registers} the bot during construction
* and {@linkplain ChatBotSupport#unregister(String) unregisters} it during {@link #stop()}.
*/
private static class RunningBot implements Runnable {
private final @NotNull String name;
private final @NotNull BotConfig config;
private final @NotNull ChatBotFactory<?> factory;
private final @NotNull Thread thread;
private final @NotNull BlockingQueue<@NotNull PendingPost> queue = new ArrayBlockingQueue<>(MESSAGE_QUEUE_SIZE);
private final AtomicBoolean stopped = new AtomicBoolean();
private volatile ChatBot bot;
public RunningBot(@NotNull String name, @NotNull BotConfig config) {
this.name = name;
this.config = config;
this.factory = getChatBotFactory(config.getType());
log.info("starting bot {}...", name);
ChatBotSupport.register(name, config);
this.thread = Thread.ofVirtual().name("ChatBot[" + name + "]").start(this);
}
@Override
public void run() {
log.info("started bot {}", name);
while (!stopped.get()) {
var bot = factory.create(config);
this.bot = bot;
try {
loop(bot);
} catch (Exception ex) {
log.warn("bot {} threw an exception and will be recreated", name, ex);
continue;
}
try {
bot.onStop();
} catch (Exception ex) {
log.warn("bot {} threw an exception during shutdown", name, ex);
}
}
log.info("stopped bot {}", name);
}
private void loop(@NotNull ChatBot bot) {
while (!stopped.get()) {
try {
var post = queue.take();
if (Instant.now().isAfter(post.expiration())) {
log.warn("A message was dropped because it expired: {}", post);
} else if (bot.getConfig().isIgnoreBots() && post.post().bot()) {
log.debug("Ignoring message {} because bottag is set.", post.post());
} else if (!bot.getConfig().getChannels().contains(post.post().channel())) {
log.debug("Ignoring message {} because channel is not listened to.", post.post());
} else {
bot.onMessage(SocketSupervisor.INSTANCE, post.post());
}
} catch (InterruptedException _) {
}
}
}
@NonBlocking
public boolean offer(@NotNull Message.Post post, @NotNull Instant expiration) {
if (stopped.get()) return false;
return queue.offer(new PendingPost(post, expiration));
}
/**
* Stops the bot, unregistering it from the MBean server and preventing it from receiving any new messages.
*/
@NonBlocking
public void stop() {
if (stopped.getAndSet(true)) return;
log.info("stopping bot {}", name);
ChatBotSupport.unregister(name);
thread.interrupt();
}
/**
* Waits for the bot to finish execution.
*/
@Blocking
public void join() throws InterruptedException {
thread.join();
}
public @NotNull BotConfig getConfig() {
var bot = this.bot;
return bot != null ? bot.getConfig() : config;
}
private record PendingPost(@NotNull Message.Post post, @NotNull Instant expiration) {}
}
}

View File

@@ -0,0 +1,157 @@
package eu.jonahbauer.chat.server.management.impl;
import eu.jonahbauer.chat.server.management.annotations.ManagedAttribute;
import eu.jonahbauer.chat.server.management.annotations.ManagedBean;
import eu.jonahbauer.chat.server.management.annotations.ManagedOperation;
import eu.jonahbauer.chat.server.management.annotations.ManagedParameter;
import lombok.extern.log4j.Log4j2;
import javax.management.*;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.function.Function;
@Log4j2
public class AdvancedMBean extends StandardMBean {
public <T> AdvancedMBean(T implementation) {
super(implementation, null, true);
}
@Override
protected String getDescription(MBeanInfo info) {
var clazz = getMBeanInterface();
var annotation = clazz.getAnnotation(ManagedBean.class);
if (annotation != null && !"".equals(annotation.description())) {
return annotation.description();
} else {
return super.getDescription(info);
}
}
@Override
protected int getImpact(MBeanOperationInfo info) {
if (info == null) return MBeanOperationInfo.UNKNOWN;
var method = findOperation(info);
var annotation = method != null ? method.getAnnotation(ManagedOperation.class) : null;
if (annotation != null) {
return annotation.impact().getCode();
} else {
return super.getImpact(info);
}
}
@Override
protected String getDescription(MBeanOperationInfo info) {
if (info == null) return null;
var method = findOperation(info);
var annotation = method != null ? method.getAnnotation(ManagedOperation.class) : null;
if (annotation != null && !"".equals(annotation.description())) {
return annotation.description();
} else {
return super.getDescription(info);
}
}
@Override
protected String getParameterName(MBeanOperationInfo op, MBeanParameterInfo param, int sequence) {
if (op == null) return super.getParameterName(op, param, sequence);
var method = findOperation(op);
if (method == null) return super.getParameterName(op, param, sequence);
var annotation = method.getParameters()[sequence].getAnnotation(ManagedParameter.class);
if (annotation != null && !"".equals(annotation.name())) {
return annotation.name();
} else {
return super.getParameterName(op, param, sequence);
}
}
@Override
protected String getDescription(MBeanOperationInfo op, MBeanParameterInfo param, int sequence) {
if (op == null) return super.getParameterName(op, param, sequence);
var method = findOperation(op);
if (method == null) return super.getParameterName(op, param, sequence);
var annotation = method.getParameters()[sequence].getAnnotation(ManagedParameter.class);
if (annotation != null && !"".equals(annotation.description())) {
return annotation.description();
} else {
return super.getParameterName(op, param, sequence);
}
}
@Override
protected String getDescription(MBeanAttributeInfo info) {
var name = info.getName();
var type = getType(info, MBeanAttributeInfo::getType);
var getter = findMethod(("boolean".equals(type) ? "is" : "get") + capitalize(name));
if (getter != null) {
var annotation = getter.getAnnotation(ManagedAttribute.class);
if (annotation != null && !"".equals(annotation.description())) return annotation.description();
}
var setter = findMethod("set" + capitalize(name), type);
if (setter != null) {
var annotation = setter.getAnnotation(ManagedAttribute.class);
if (annotation != null && !"".equals(annotation.description())) return annotation.description();
}
return super.getDescription(info);
}
private <T extends MBeanFeatureInfo> String getType(T info, Function<T, String> getType) {
var descriptor = info.getDescriptor();
if (descriptor == null) return getType.apply(info);
try {
var originalType = (String) descriptor.getFieldValue(JMX.ORIGINAL_TYPE_FIELD);
return originalType.contains("<") ? originalType.substring(0, originalType.indexOf("<")) : originalType;
} catch (ClassCastException | RuntimeOperationsException ignored) {
return getType.apply(info);
}
}
private Method findOperation(MBeanOperationInfo info) {
var name = info.getName();
var params = new ArrayList<String>();
for (var parameter : info.getSignature()) {
params.add(getType(parameter, MBeanParameterInfo::getType));
}
return findMethod(name, params);
}
private Method findMethod(String name, String...parameters) {
return findMethod(name, Arrays.asList(parameters));
}
private Method findMethod(String name, List<String> parameters) {
var clazz = getMBeanInterface();
methods: for (var method : clazz.getMethods()) {
if (!method.getName().equals(name)) continue;
if (method.getParameterCount() != parameters.size()) continue;
var parameterTypes = method.getParameterTypes();
for (int i = 0; i < parameters.size(); i++) {
if (!parameters.get(i).equals(parameterTypes[i].getName())) continue methods;
}
return method;
}
return null;
}
private static String capitalize(String name) {
if (name.isEmpty()) return name;
return name.substring(0, 1).toUpperCase(Locale.ROOT) + name.substring(1);
}
}

View File

@@ -0,0 +1,67 @@
package eu.jonahbauer.chat.server.management.impl;
import eu.jonahbauer.chat.bot.impl.BotCreationException;
import eu.jonahbauer.chat.bot.impl.ChatBotFactory;
import eu.jonahbauer.chat.server.bot.ChatBotSupervisor;
import eu.jonahbauer.chat.server.management.BotConfigSupport;
import eu.jonahbauer.chat.server.management.ChatBotManagerMXBean;
import lombok.SneakyThrows;
import org.jetbrains.annotations.NotNull;
import java.lang.management.ManagementFactory;
import java.util.*;
import java.util.stream.Collectors;
public enum ChatBotManager implements ChatBotManagerMXBean {
INSTANCE;
@SneakyThrows
ChatBotManager() {
var server = ManagementFactory.getPlatformMBeanServer();
server.registerMBean(new AdvancedMBean(this), ChatBotManagerMXBean.NAME);
}
@Override
public void start(@NotNull String name, @NotNull String type, @NotNull List<@NotNull String> channels) {
try {
ChatBotSupervisor.INSTANCE.start(name, type, channels);
} catch (BotCreationException ex) {
throw new IllegalArgumentException(ex.getMessage(), ex.getCause());
}
}
@Override
public void start(@NotNull String name, @NotNull BotConfigSupport config) {
try {
ChatBotSupervisor.INSTANCE.start(name, config.unwrap());
} catch (BotCreationException ex) {
throw new IllegalArgumentException(ex.getMessage(), ex.getCause());
}
}
@Override
public void stop(@NotNull String name) throws InterruptedException {
ChatBotSupervisor.INSTANCE.stop(name);
}
@Override
public void stop() throws InterruptedException {
ChatBotSupervisor.INSTANCE.stop();
}
@Override
public @NotNull Map<@NotNull String, @NotNull BotConfigSupport> getBots() {
var out = new TreeMap<String, BotConfigSupport>();
ChatBotSupervisor.INSTANCE.getBots().forEach((key, value) -> out.put(key, new BotConfigSupport(value)));
return Collections.unmodifiableMap(out);
}
@Override
public @NotNull SortedSet<@NotNull String> getBotImplementations() {
return Collections.unmodifiableSortedSet(
ChatBotFactory.implementations().stream()
.map(Class::getCanonicalName)
.collect(Collectors.toCollection(TreeSet::new))
);
}
}

View File

@@ -0,0 +1,44 @@
package eu.jonahbauer.chat.server.management.impl;
import eu.jonahbauer.chat.bot.config.BotConfig;
import eu.jonahbauer.chat.server.bot.ChatBotSupervisor;
import eu.jonahbauer.chat.server.management.BotConfigSupport;
import eu.jonahbauer.chat.server.management.ChatBotSupportMXBean;
import lombok.Getter;
import lombok.extern.log4j.Log4j2;
import java.lang.management.ManagementFactory;
@Getter
@Log4j2
public class ChatBotSupport implements ChatBotSupportMXBean {
private final String name;
private final BotConfigSupport config;
public static void register(String name, BotConfig config) {
try {
var server = ManagementFactory.getPlatformMBeanServer();
server.registerMBean(new AdvancedMBean(new ChatBotSupport(name, config)), ChatBotSupportMXBean.getObjectName(name));
} catch (Exception ex) {
log.error("Could not register bot as an MBean.", ex);
}
}
public static void unregister(String name) {
try {
var server = ManagementFactory.getPlatformMBeanServer();
server.unregisterMBean(ChatBotSupportMXBean.getObjectName(name));
} catch (Exception ex) {
log.error("Could not unregister bot as an MBean.", ex);
}
}
private ChatBotSupport(String name, BotConfig config) {
this.name = name;
this.config = new BotConfigSupport(config);
}
public void stop() throws InterruptedException {
ChatBotSupervisor.INSTANCE.stop(name);
}
}

View File

@@ -0,0 +1,44 @@
package eu.jonahbauer.chat.server.management.impl;
import eu.jonahbauer.chat.server.management.SocketManagerMXBean;
import eu.jonahbauer.chat.server.socket.SocketSupervisor;
import lombok.SneakyThrows;
import org.jetbrains.annotations.NotNull;
import java.lang.management.ManagementFactory;
import java.util.SortedSet;
public enum SocketManager implements SocketManagerMXBean {
INSTANCE;
@SneakyThrows
SocketManager() {
var server = ManagementFactory.getPlatformMBeanServer();
server.registerMBean(new AdvancedMBean(this), SocketManagerMXBean.NAME);
}
@Override
public void setCredentials(@NotNull String username, @NotNull String password) {
SocketSupervisor.INSTANCE.setAccount(username, password);
}
@Override
public void start(@NotNull String channel) {
SocketSupervisor.INSTANCE.start(channel);
}
@Override
public void stop(@NotNull String channel) throws InterruptedException {
SocketSupervisor.INSTANCE.stop(channel);
}
@Override
public void stop() throws InterruptedException {
SocketSupervisor.INSTANCE.stop();
}
@Override
public @NotNull SortedSet<@NotNull String> getChannels() {
return SocketSupervisor.INSTANCE.getChannels();
}
}

View File

@@ -0,0 +1,64 @@
package eu.jonahbauer.chat.server.management.impl;
import eu.jonahbauer.chat.server.management.SocketState;
import eu.jonahbauer.chat.server.management.SocketSupportMXBean;
import eu.jonahbauer.chat.server.socket.SocketSupervisor;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import java.lang.management.ManagementFactory;
import java.util.Date;
@Log4j2
@Getter
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public class SocketSupport implements SocketSupportMXBean {
private final String channel;
public static void register(String channel) {
try {
var server = ManagementFactory.getPlatformMBeanServer();
server.registerMBean(new AdvancedMBean(new SocketSupport(channel)), SocketSupportMXBean.getObjectName(channel));
} catch (Exception ex) {
log.error("Could not register socket as an MBean.", ex);
}
}
public static void unregister(String name) {
try {
var server = ManagementFactory.getPlatformMBeanServer();
server.unregisterMBean(SocketSupportMXBean.getObjectName(name));
} catch (Exception ex) {
log.error("Could not unregister socket as an MBean.", ex);
}
}
@Override
public Date getCooldownUntil() {
var cooldown = SocketSupervisor.INSTANCE.getCooldownUntil(channel);
return cooldown != null ? Date.from(cooldown) : null;
}
@Override
public SocketState getState() {
return SocketSupervisor.INSTANCE.getState(channel);
}
@Override
public void stop() throws InterruptedException {
SocketSupervisor.INSTANCE.stop(channel);
}
@Override
public void restart() {
SocketSupervisor.INSTANCE.restart(channel);
}
@Override
public void send(String name, String message, boolean bottag, boolean publicid) {
SocketSupervisor.INSTANCE.send(channel, name, message, bottag, publicid);
}
}

View File

@@ -0,0 +1,21 @@
package eu.jonahbauer.chat.server.socket;
import com.fasterxml.jackson.annotation.JsonFormat;
import org.jetbrains.annotations.NotNull;
import java.util.Objects;
public record OutgoingMessage(
@NotNull String name,
@NotNull String message,
@NotNull String channel,
long delay,
@JsonFormat(shape = JsonFormat.Shape.NUMBER) boolean publicid,
@JsonFormat(shape = JsonFormat.Shape.NUMBER) boolean bottag
) {
public OutgoingMessage {
Objects.requireNonNull(channel, "channel");
Objects.requireNonNull(name, "name");
Objects.requireNonNull(message, "message");
}
}

View File

@@ -0,0 +1,7 @@
package eu.jonahbauer.chat.server.socket;
import lombok.experimental.StandardException;
@StandardException
public class SocketCreationException extends RuntimeException {
}

View File

@@ -0,0 +1,576 @@
package eu.jonahbauer.chat.server.socket;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import eu.jonahbauer.chat.bot.api.Chat;
import eu.jonahbauer.chat.bot.api.Message;
import eu.jonahbauer.chat.server.Config;
import eu.jonahbauer.chat.server.bot.ChatBotSupervisor;
import eu.jonahbauer.chat.server.management.SocketState;
import eu.jonahbauer.chat.server.management.impl.SocketSupport;
import lombok.SneakyThrows;
import lombok.extern.log4j.Log4j2;
import org.apache.logging.log4j.Level;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.io.IOException;
import java.net.CookieManager;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpRequest.BodyPublishers;
import java.net.http.HttpResponse;
import java.net.http.WebSocket;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
import static eu.jonahbauer.chat.server.util.UrlTemplateProcessor.URL;
@Log4j2
public enum SocketSupervisor implements Chat {
INSTANCE;
private static final URI AUTH_SERVER = URI.create("https://chat.qed-verein.de/rubychat/account");
private static final String ORIGIN = "https://chat.qed-verein.de";
private static final String SERVER = "wss://chat.qed-verein.de/websocket?position=0&version=2&channel=";
private static final int PING_INTERVAL = 30;
private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1, Thread.ofVirtual().name("SocketSupervisor").factory());
private static final ObjectMapper MAPPER = new ObjectMapper().registerModule(new JavaTimeModule());
private volatile @Nullable Config.Account account;
private volatile @Nullable Credentials credentials;
private final @NotNull CookieManager cookie = new CookieManager();
private final @NotNull HttpClient client = HttpClient.newBuilder().cookieHandler(cookie).build();
private final @NotNull ConcurrentMap<@NotNull String, @NotNull ChatClient> sockets = new ConcurrentHashMap<>();
public void setAccount(@NotNull Config.Account account) {
this.account = Objects.requireNonNull(account, "account");
}
public void setAccount(@NotNull String username, @NotNull String password) {
this.account = new Config.Account(username, password);
}
@Override
public boolean send(@NotNull String channel, @NotNull String name, @NotNull String message, boolean bottag, boolean publicId) {
var socket = sockets.get(channel);
if (socket == null || !socket.isOpen()) {
log.error("Cannot deliver message to {}: not connected", channel);
return false;
} else {
var out = new OutgoingMessage(name, message, channel, socket.delay, publicId, bottag);
try {
socket.send(MAPPER.writeValueAsString(out));
log.info("Sending message: {}", out);
return true;
} catch (JsonProcessingException e) {
log.error("Could not serialize message: {}", out, e);
return false;
}
}
}
/**
* Starts all sockets defined in the given config.
* @param config a configuration
* @throws IllegalStateException if any sockets are already running
*/
public synchronized void start(@NotNull Config config) {
if (!this.sockets.isEmpty()) throw new IllegalStateException("start(Config) may not be used when any sockets are running already");
setAccount(config.account());
config.channels().forEach(this::start);
}
/**
* Starts a socket for the given channel.
* @param channel the channel
* @throws IllegalStateException if a socket is already connected to that channel
*/
public synchronized void start(@NotNull String channel) {
if (sockets.containsKey(channel)) throw new SocketCreationException("Duplicate channel: " + channel);
var socket = new ChatClient(channel);
this.sockets.put(channel, socket);
}
/**
* Forcefully restarts the socket for the given channel when it is on cooldown.
* @param channel the channel
* @throws IllegalStateException if the socket is not on cooldown
*/
public void restart(@NotNull String channel) {
var socket = sockets.get(channel);
if (socket != null) socket.restart();
}
public @Nullable SocketState getState(@NotNull String channel) {
var socket = sockets.get(channel);
return socket == null ? null : socket.getState();
}
public @Nullable Instant getCooldownUntil(@NotNull String channel) {
var socket = sockets.get(channel);
return socket == null ? null : socket.getCooldownUntil();
}
/**
* Stops the socket for the given channel and waits for it to finish.
* @param channel the channel
* @throws InterruptedException if any thread has interrupted the current thread.
*/
public void stop(@NotNull String channel) throws InterruptedException {
var socket = sockets.get(channel);
if (socket == null) return;
socket.stop();
socket.join();
}
/**
* Stops all currently running sockets and waits for them to finish.
* @throws InterruptedException if any thread has interrupted the current thread.
*/
public void stop() throws InterruptedException {
var sockets = new ArrayList<>(this.sockets.values());
for (var socket : sockets) {
socket.stop();
}
for (var socket : sockets) {
socket.join();
}
}
public @NotNull SortedSet<@NotNull String> getChannels() {
return Collections.unmodifiableSortedSet(new TreeSet<>(sockets.keySet()));
}
@SneakyThrows
private @NotNull Credentials login() {
var account = this.account;
if (account == null) throw new IllegalStateException("Account not initialized.");
var credentials = this.credentials;
if (credentials != null) return credentials;
var request = HttpRequest.newBuilder().uri(AUTH_SERVER)
.POST(BodyPublishers.ofString(
URL."username=\{account.username()}&password=\{account.password()}&version=20171030131648"
))
.build();
var response = client.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() != 200) {
throw new IllegalArgumentException(STR."invalid credentials (status code: \{response.statusCode()})");
}
var body = response.body();
if (!body.contains("success")) {
throw new IllegalArgumentException(STR."invalid credentials (\{body})");
}
var cookies = cookie.getCookieStore().get(AUTH_SERVER);
var userid = cookies.stream().filter(cookie -> cookie.getName().equals("userid")).findFirst();
var pwhash = cookies.stream().filter(cookie -> cookie.getName().equals("pwhash")).findFirst();
if (userid.isEmpty() || pwhash.isEmpty()) {
throw new IllegalArgumentException(STR."invalid credentials (status code: \{response.statusCode()})");
}
this.credentials = credentials = new Credentials(userid.get().getValue(), pwhash.get().getValue());
return credentials;
}
private record Credentials(String userid, String pwhash) {}
private sealed interface ChatClientState {
default void onEnter() {}
default void send(@NotNull String message) {
throw new IllegalStateException();
}
default void restart() {
throw new IllegalStateException();
}
default void stop() {
abort();
}
default void abort() {
throw new IllegalStateException();
}
@NotNull SocketState getState();
}
private final class ChatClient implements WebSocket.Listener, ChatClientState {
private final String channel;
private final @NotNull ReentrantLock lock = new ReentrantLock();
private final @NotNull CountDownLatch stopped = new CountDownLatch(1);
private @NotNull ChatClientState state = new Created();
private volatile long delay = -1;
public ChatClient(@NotNull String channel) {
this.channel = channel;
SocketSupport.register(channel);
this.state.onEnter();
}
private void transition(@NotNull ChatClientState from, @NotNull ChatClientState to) {
lock.lock();
try {
if (state == from) {
state = to;
state.onEnter();
}
} finally {
lock.unlock();
}
}
public boolean isOpen() {
return getState() == SocketState.CONNECTED;
}
public @Nullable Instant getCooldownUntil() {
return state instanceof Cooldown cooldown ? cooldown.until : null;
}
@Override
public void send(@NotNull String message) {
lock.lock();
try {
state.send(message);
} finally {
lock.unlock();
}
}
@Override
public void restart() {
lock.lock();
try {
state.restart();
} finally {
lock.unlock();
}
}
@Override
public void stop() {
lock.lock();
try {
state.stop();
} finally {
lock.unlock();
}
}
@Override
public void abort() {
lock.lock();
try {
state.abort();
} finally {
lock.unlock();
}
}
@Override
public @NotNull SocketState getState() {
return state.getState();
}
public void join() throws InterruptedException {
stopped.await();
}
private <T extends Throwable> void withNamedThread(@NotNull ThrowingRunnable<T> runnable) throws T {
var name = Thread.currentThread().getName();
try {
Thread.currentThread().setName(STR."Socket[\{channel}]");
runnable.run();
} finally {
Thread.currentThread().setName(name);
}
}
private <R, T extends Throwable> R withNamedThread(@NotNull ThrowingSupplier<R, T> supplier) throws T {
var name = Thread.currentThread().getName();
try {
Thread.currentThread().setName(STR."Socket[\{channel}]");
return supplier.get();
} finally {
Thread.currentThread().setName(name);
}
}
private interface ThrowingRunnable<T extends Throwable> {
void run() throws T;
}
private interface ThrowingSupplier<R, T extends Throwable> {
R get() throws T;
}
public final class Created implements ChatClientState {
@Override
public void onEnter() {
transition(this, new Connecting());
}
@Override
public @NotNull SocketState getState() {
return SocketState.CREATED;
}
}
public final class Connecting implements ChatClientState {
private final int cooldown;
public Connecting() {
this(1);
}
public Connecting(int cooldown) {
this.cooldown = cooldown;
}
@Override
public void onEnter() {
log.info("starting socket {}", channel);
try {
var connected = new Connected();
transition(this, connected);
} catch (Exception ex) {
log.warn("socket {} failed with an exception.", channel, ex);
transition(this, new Cooldown(cooldown));
}
}
@Override
public @NotNull SocketState getState() {
return SocketState.CONNECTING;
}
}
public final class Connected implements ChatClientState, WebSocket.Listener {
private final @NotNull WebSocket socket;
private @Nullable ScheduledFuture<?> ping;
private volatile boolean stopped;
private final @NotNull List<@NotNull CharSequence> parts = new ArrayList<>();
private volatile CompletableFuture<?> message = new CompletableFuture<>();
public Connected() throws ExecutionException, InterruptedException {
var credentials = SocketSupervisor.this.login();
this.socket = SocketSupervisor.this.client.newWebSocketBuilder()
.header("Origin", ORIGIN)
.header("Cookie", URL."userid=\{credentials.userid()}; pwhash=\{credentials.pwhash()}")
.buildAsync(URI.create(SERVER + URL."\{channel}"), this)
.get();
}
@Override
public void onOpen(@NotNull WebSocket webSocket) {
withNamedThread(() -> {
log.info("started socket {}", channel);
ping = SCHEDULER.scheduleAtFixedRate(this::ping, PING_INTERVAL, PING_INTERVAL, TimeUnit.SECONDS);
WebSocket.Listener.super.onOpen(webSocket);
});
}
@Override
public @Nullable CompletionStage<?> onText(@NotNull WebSocket webSocket, @NotNull CharSequence message, boolean last) {
return withNamedThread(() -> {
this.parts.add(message);
webSocket.request(1);
if (last) {
onMessage(this.parts);
this.parts.clear();
this.message.complete(null);
CompletionStage<?> out = this.message;
this.message = new CompletableFuture<>();
return out;
}
return this.message;
});
}
private void onMessage(@NotNull List<@NotNull CharSequence> parts) {
var text = String.join("", parts);
try {
var message = MAPPER.readValue(text, Message.class);
var level = message instanceof Message.Post || message instanceof Message.Ack ? Level.INFO : Level.DEBUG;
log.log(level, "Received message: {}", message);
if (message instanceof Message.Post post) {
delay = post.id() + 1;
ChatBotSupervisor.INSTANCE.onMessage(post);
}
} catch (JsonProcessingException e) {
log.warn("Could not parse as message: {}", text, e);
}
}
@Override
public @Nullable CompletionStage<?> onBinary(WebSocket webSocket, ByteBuffer data, boolean last) {
return withNamedThread(() -> {
log.warn("Socket {} received binary data.", channel);
return WebSocket.Listener.super.onBinary(webSocket, data, last);
});
}
@Override
public @Nullable CompletionStage<?> onClose(@NotNull WebSocket webSocket, int statusCode, @NotNull String reason) {
return withNamedThread(() -> {
log.info("socket {} closed (code={}, reason={})", channel, statusCode, reason);
lock.lock();
try {
if (stopped) {
transition(this, new Stopped());
} else {
transition(this, new Cooldown(1));
}
} finally {
lock.unlock();
}
return null;
});
}
@Override
public void onError(WebSocket webSocket, Throwable error) {
withNamedThread(() -> {
log.warn("Socket {} failed with an exception.", channel, error);
lock.lock();
try {
if (stopped) {
transition(this, new Stopped());
} else {
transition(this, new Cooldown(1));
}
} finally {
lock.unlock();
}
});
}
@Override
public void send(@NotNull String message) {
if (stopped) throw new IllegalStateException();
socket.sendText(message, true);
}
@Override
public void abort() {
if (ping != null) ping.cancel(true);
socket.abort();
transition(this, new Stopped());
}
@Override
public void stop() {
if (stopped) throw new IllegalStateException();
log.info("stopping socket {}", channel);
if (ping != null) ping.cancel(true);
stopped = true;
socket.sendClose(WebSocket.NORMAL_CLOSURE, "ok");
}
private void ping() {
try {
socket.sendText(MAPPER.writeValueAsString(Message.PING), true);
log.debug("Sending message: {}", Message.PING);
} catch (IOException ex) {
log.error("Failed to send ping", ex);
}
}
@Override
public @NotNull SocketState getState() {
return stopped ? SocketState.STOPPING : SocketState.CONNECTED;
}
}
public final class Cooldown implements ChatClientState {
private final int cooldown;
private Instant until;
private ScheduledFuture<?> future;
public Cooldown(int cooldown) {
this.cooldown = cooldown;
}
@Override
public void onEnter() {
log.info("restarting socket {} in {} seconds", channel, cooldown);
var nextCooldown = Math.min(3600, cooldown * 2);
this.until = Instant.now().plusSeconds(cooldown);
this.future = SCHEDULER.schedule(() -> transition(this, new Connecting(nextCooldown)), cooldown, TimeUnit.SECONDS);
}
@Override
public void abort() {
log.info("stopping socket {}", channel);
future.cancel(true);
transition(this, new Stopped());
}
@Override
public void restart() {
future.cancel(true);
transition(this, new Connecting());
}
@Override
public @NotNull SocketState getState() {
return SocketState.COOLDOWN;
}
}
public final class Stopped implements ChatClientState {
@Override
public void onEnter() {
log.info("stopped socket {}", channel);
stopped.countDown();
sockets.remove(channel, ChatClient.this);
SocketSupport.unregister(channel);
}
@Override
public void abort() {}
@Override
public @NotNull SocketState getState() {
return SocketState.STOPPED;
}
}
}
}

View File

@@ -0,0 +1,28 @@
package eu.jonahbauer.chat.server.util;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
public enum UrlTemplateProcessor implements StringTemplate.Processor<String, RuntimeException> {
URL;
@Override
public String process(StringTemplate template) throws RuntimeException {
var out = new StringBuilder();
var fragments = template.fragments();
var values = template.values();
for (int i = 0, length = values.size(); i < length; i++) {
out.append(fragments.get(i));
var value = values.get(i);
if (value != null) {
out.append(URLEncoder.encode(value.toString(), StandardCharsets.UTF_8));
}
}
out.append(fragments.getLast());
return out.toString();
}
}

View File

@@ -0,0 +1,17 @@
module eu.jonahbauer.chat.server {
exports eu.jonahbauer.chat.server;
opens eu.jonahbauer.chat.server.bot to com.fasterxml.jackson.databind;
opens eu.jonahbauer.chat.server.socket to com.fasterxml.jackson.databind;
requires com.fasterxml.jackson.core;
requires com.fasterxml.jackson.databind;
requires com.fasterxml.jackson.datatype.jsr310;
requires eu.jonahbauer.chat.bot.api;
requires eu.jonahbauer.chat.server.management;
requires java.management;
requires java.net.http;
requires org.apache.logging.log4j;
requires org.jetbrains.annotations;
requires static lombok;
}

View File

@@ -0,0 +1,12 @@
<Configuration>
<Appenders>
<Console name="console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight{%-5level} [%15.15t] %style{%c{1}}{cyan} : %msg%n" />
</Console>
</Appenders>
<Loggers>
<Root level="info" additivity="false">
<AppenderRef ref="console" />
</Root>
</Loggers>
</Configuration>