EventBus.java

package com.workbenchclassic;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import javax.json.JsonObject;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.sse.Sse;
import javax.ws.rs.sse.SseEventSink;

public class EventBus {

    private static final Map<String, Set<Client>> subscribers =
            new ConcurrentHashMap<>();

    public static void register(String machineID, SseEventSink sink, Sse sse) {
        subscribers
                .computeIfAbsent(machineID, k -> ConcurrentHashMap.newKeySet())
                .add(new Client(sink, sse));
    }

    public static void publish(String machineID, JsonObject message) {
        Set<Client> clients = subscribers.get(machineID);
        if (clients == null) return;

        clients.removeIf(client -> {
            if (client.sink.isClosed()) {
                return true; // remove closed connection
            }

            try {
                client.sink.send(
                        client.sse.newEventBuilder()
                                .mediaType(MediaType.APPLICATION_JSON_TYPE)
                                .data(JsonObject.class, message)
                                .build()
                );
                return false;
            } catch (Exception e) {
                // Verbindung tot → entfernen
                try {
                    client.sink.close();
                } catch (Exception ignore) {}
                return true;
            }
        });

        if (clients.isEmpty()) {
            subscribers.remove(machineID);
        }
    }

    private static class Client {
        final SseEventSink sink;
        final Sse sse;

        Client(SseEventSink sink, Sse sse) {
            this.sink = sink;
            this.sse = sse;
        }
    }
}