...
 
Commits (3)
......@@ -1066,7 +1066,7 @@ public class Manager {
File cacheFile = getMessageCacheFile(envelope, now);
storeEnvelope(envelope, cacheFile);
} catch (IOException e) {
logger.warn("Failed to store encrypted message in disk cache, ignoring: " + e.getMessage());
logger.warn("Failed to store encrypted message in disk cache: " + e.getMessage());
}
}
});
......@@ -1075,7 +1075,7 @@ public class Manager {
return;
continue;
} catch (InvalidVersionException e) {
logger.info("Ignoring error: " + e.getMessage());
logger.warn("Ignoring invalid version exception because I haven't figured out why this might happen. file an issue if you want to help: " + e.getMessage());
continue;
}
if (!envelope.isReceipt()) {
......
......@@ -54,15 +54,23 @@ public class MessageReceiver implements Manager.ReceiveMessageHandler, Runnable
receivers.get(username).subscribe(socket);
}
public synchronized static void resubscribe(String username, List<Socket> sockets) {
logger.info("Restarting MessageReceiver and reconnecting " + sockets.size() + " socket(s)");
MessageReceiver receiver = new MessageReceiver(username);
receivers.put(username, receiver);
Thread messageReceiverThread = new Thread(receiver);
messageReceiverThread.start();
for(Socket s : sockets) {
receivers.get(username).subscribe(s);
}
}
public synchronized static void unsubscribe(String username, Socket socket) {
MessageReceiver r = receivers.get(username);
r.removeSocket(socket);
logger.debug("Unsubscribed socket from user " + Util.redact(username) + " - " + r.sockets.size() + " client(s) connected.");
if (!r.hasConnectedSockets()) {
try {
Manager.get(r.username).shutdownMessagePipe();
} catch(IOException | NoSuchAccountException e) {
logger.error("Error while shutting down message pipe from signal", e);
}
receivers.remove(username);
}
}
......@@ -91,20 +99,16 @@ public class MessageReceiver implements Manager.ReceiveMessageHandler, Runnable
private synchronized void removeSocket(Socket s) {
if(sockets.remove(s) && !hasConnectedSockets()) {
logger.info("Last client for " + Util.redact(this.username) + " unsubscribed, shutting down message pipe!");
logger.info("Last client for " + Util.redact(this.username) + " unsubscribed, shutting down connection to server");
try {
Manager.get(username).shutdownMessagePipe();
} catch (IOException | NoSuchAccountException e) {
logger.catching(e);
logger.error("error shutting down connection to server: ", e);
}
}
}
private synchronized boolean hasConnectedSockets() {
if(sockets.size() == 0) {
logger.info("Last client for " + Util.redact(username) + ", closing connection to server");
}
return sockets.size() > 0;
}
......@@ -126,11 +130,27 @@ public class MessageReceiver implements Manager.ReceiveMessageHandler, Runnable
}
} catch (AssertionError e) {
logger.catching(e);
} finally {
manager.shutdownMessagePipe();
}
}
} catch(Exception e) {
logger.catching(e);
}
boolean restart = false;
synchronized(this) {
if(sockets.size() == 0) {
logger.debug("Removing receiver for " + Util.redact(username));
receivers.remove(username);
} else {
logger.warn("message receiver shut down but there are sockets connected!");
restart = true;
}
}
if(restart) {
sockets.resubscribe(username);
}
}
@Override
......
......@@ -20,6 +20,8 @@ package io.finn.signald;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.finn.signald.clientprotocol.v0.JsonMessageWrapper;
import io.finn.signald.util.JSONUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.io.PrintWriter;
......@@ -32,6 +34,7 @@ import java.util.List;
class SocketManager {
private static final ObjectMapper mapper = JSONUtil.GetMapper();
private static final Logger log = LogManager.getLogger(SocketManager.class.getName());
private final List<Socket> sockets = Collections.synchronizedList(new ArrayList<>());
......@@ -51,7 +54,6 @@ class SocketManager {
public synchronized void broadcast(JsonMessageWrapper message) throws IOException {
String jsonMessage = mapper.writeValueAsString(message);
synchronized(this.sockets) {
Iterator i = this.sockets.iterator();
while(i.hasNext()) {
Socket s = (Socket)i.next();
......@@ -61,6 +63,11 @@ class SocketManager {
new PrintWriter(s.getOutputStream(), true).println(jsonMessage);
}
}
}
}
public synchronized void resubscribe(String username) {
List<Socket> socketsToResubscribe = new ArrayList<>();
socketsToResubscribe.addAll(sockets);
MessageReceiver.resubscribe(username, socketsToResubscribe);
}
}