Commit 11db556f authored by Finn Herzfeld's avatar Finn Herzfeld 🌵

Fix some synchronization issues and make all JSON fields public

parent 19967b2e
Pipeline #4036 failed with stages
in 5 minutes and 32 seconds
......@@ -29,19 +29,20 @@ import java.io.IOException;
import java.net.Socket;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
public class MessageReceiver implements Manager.ReceiveMessageHandler, Runnable {
static private ConcurrentHashMap<String, MessageReceiver> receivers = new ConcurrentHashMap<>();
static private final ConcurrentHashMap<String, MessageReceiver> receivers = new ConcurrentHashMap<>();
final String username;
private SocketManager sockets;
private final SocketManager sockets;
private static final Logger logger = LogManager.getLogger();
public static void subscribe(String username, Socket socket) {
if(!receivers.containsKey(username)) {
public synchronized static void subscribe(String username, Socket socket) {
if (!receivers.containsKey(username)) {
MessageReceiver receiver = new MessageReceiver(username);
receivers.put(username, receiver);
Thread messageReceiverThread = new Thread(receiver);
......@@ -53,9 +54,26 @@ public class MessageReceiver implements Manager.ReceiveMessageHandler, Runnable
receivers.get(username).subscribe(socket);
}
public static void unsubscribe(String username, Socket socket) {
receivers.get(username).unsubscribe(socket);
receivers.remove(username);
public synchronized static void unsubscribe(String username, Socket socket) {
MessageReceiver r = receivers.get(username);
r.removeSocket(socket);
if (!r.hasConnectedSockets()) {
try {
Manager.get(r.username).shutdownMessagePipe();
} catch(IOException | NoSuchAccountException e) {
logger.error("Error while shutting down message pipe from signal", e);
}
receivers.remove(username);
}
}
public synchronized static void unsubscribe(Socket socket) {
for(Map.Entry<String, MessageReceiver> r : receivers.entrySet()) {
r.getValue().removeSocket(socket);
if(r.getValue().hasConnectedSockets()) {
receivers.remove(r.getKey());
}
}
}
private MessageReceiver(String username) {
......@@ -63,27 +81,32 @@ public class MessageReceiver implements Manager.ReceiveMessageHandler, Runnable
this.sockets = new SocketManager();
}
public static List<String> getSubscribedAccounts() {
public synchronized static List<String> getSubscribedAccounts() {
return Collections.list(receivers.keys());
}
public void subscribe(Socket s) {
this.sockets.add(s);
public synchronized void subscribe(Socket s) {
this.sockets.add(s);
}
public boolean unsubscribe(Socket s) {
boolean removed = sockets.remove(s);
if(removed && sockets.size() == 0) {
logger.info("Last client for " + this.username + " unsubscribed, shutting down message pipe!");
try {
Manager.get(username).shutdownMessagePipe();
} catch(IOException | NoSuchAccountException e) {
logger.catching(e);
}
}
return removed;
private synchronized void removeSocket(Socket s) {
if(sockets.remove(s) && !hasConnectedSockets()) {
logger.info("Last client for " + Util.redact(this.username) + " unsubscribed, shutting down message pipe!");
try {
Manager.get(username).shutdownMessagePipe();
} catch (IOException | NoSuchAccountException e) {
logger.catching(e);
}
}
}
private synchronized boolean hasConnectedSockets() {
if(sockets.size() == 0) {
logger.info("Last client for " + Util.redact(username) + ", closing connection to server");
}
return sockets.size() > 0;
}
public void run() {
try {
......@@ -96,7 +119,11 @@ public class MessageReceiver implements Manager.ReceiveMessageHandler, Runnable
try {
manager.receiveMessages((long) (timeout * 1000), TimeUnit.MILLISECONDS, returnOnTimeout, ignoreAttachments, this);
} catch (IOException e) {
logger.debug("probably harmless IOException while receiving messages:", e);
if(sockets.size() == 0) {
logger.debug("probably harmless IOException after client disconnected:", e);
} else {
throw e;
}
} catch (AssertionError e) {
logger.catching(e);
}
......
......@@ -144,12 +144,7 @@ public class SocketHandler implements Runnable {
logger.catching(e);
}
for(Map.Entry<String, MessageReceiver> entry : receivers.entrySet()) {
if(entry.getValue().unsubscribe(socket)) {
logger.info("Unsubscribed from " + entry.getKey());
receivers.remove(entry.getKey());
}
}
MessageReceiver.unsubscribe(socket);
}
}
......
......@@ -17,48 +17,40 @@
package io.finn.signald;
import java.io.PrintWriter;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.finn.signald.clientprotocol.v0.JsonMessageWrapper;
import io.finn.signald.util.JSONUtil;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Iterator;
import java.util.ArrayList;
import java.net.Socket;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.core.JsonGenerator;
import io.finn.signald.clientprotocol.v0.JsonMessageWrapper;
import java.util.List;
class SocketManager {
private List<Socket> sockets = Collections.synchronizedList(new ArrayList<Socket>());
private ObjectMapper mpr = new ObjectMapper();
private static final ObjectMapper mapper = JSONUtil.GetMapper();
public SocketManager() {
this.mpr.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); // disable autodetect
this.mpr.setSerializationInclusion(Include.NON_NULL);
this.mpr.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
this.mpr.disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET);
}
private final List<Socket> sockets = Collections.synchronizedList(new ArrayList<>());
public SocketManager() {}
public void add(Socket s) {
public synchronized void add(Socket s) {
this.sockets.add(s);
}
public boolean remove(Socket s) {
public synchronized boolean remove(Socket s) {
return this.sockets.remove(s);
}
public int size() {
public synchronized int size() {
return this.sockets.size();
}
public void broadcast(JsonMessageWrapper message) throws JsonProcessingException, IOException {
public synchronized void broadcast(JsonMessageWrapper message) throws IOException {
String jsonMessage = mapper.writeValueAsString(message);
synchronized(this.sockets) {
Iterator i = this.sockets.iterator();
while(i.hasNext()) {
......@@ -66,15 +58,9 @@ class SocketManager {
if(s.isClosed()) {
this.remove(s);
} else {
send(message, s);
new PrintWriter(s.getOutputStream(), true).println(jsonMessage);
}
}
}
}
public void send(JsonMessageWrapper message, Socket s) throws JsonProcessingException, IOException {
String jsonmessage = this.mpr.writeValueAsString(message);
PrintWriter out = new PrintWriter(s.getOutputStream(), true);
out.println(jsonmessage);
}
}
......@@ -30,18 +30,18 @@ import java.io.IOException;
public class JsonAttachment {
String contentType;
long id;
int size;
String storedFilename;
public String contentType;
public long id;
public int size;
public String storedFilename;
public String filename;
public String caption;
public int width;
public int height;
public boolean voiceNote;
String preview;
String key;
String digest;
public String preview;
public String key;
public String digest;
public String blurhash;
JsonAttachment() {}
......
......@@ -21,14 +21,14 @@ import org.whispersystems.signalservice.api.messages.calls.*;
import java.util.List;
class JsonCallMessage {
OfferMessage offerMessage;
AnswerMessage answerMessage;
BusyMessage busyMessage;
HangupMessage hangupMessage;
List<IceUpdateMessage> iceUpdateMessages;
Integer destinationDeviceId;
boolean isMultiRing;
public class JsonCallMessage {
public OfferMessage offerMessage;
public AnswerMessage answerMessage;
public BusyMessage busyMessage;
public HangupMessage hangupMessage;
public List<IceUpdateMessage> iceUpdateMessages;
public Integer destinationDeviceId;
public boolean isMultiRing;
JsonCallMessage(SignalServiceCallMessage callMessage) {
if(callMessage.getOfferMessage().isPresent()) {
......
......@@ -19,12 +19,12 @@ package io.finn.signald.clientprotocol.v0;
import org.whispersystems.signalservice.api.messages.multidevice.ConfigurationMessage;
class JsonConfigurationMessage {
public class JsonConfigurationMessage {
boolean readReceipts;
boolean unidentifiedDeliveryIndicators;
boolean typingIndicators;
boolean linkPreviews;
public boolean readReceipts;
public boolean unidentifiedDeliveryIndicators;
public boolean typingIndicators;
public boolean linkPreviews;
JsonConfigurationMessage(ConfigurationMessage verifiedMessage) {
if(verifiedMessage.getReadReceipts().isPresent()) {
......
......@@ -28,20 +28,20 @@ import java.util.ArrayList;
import java.util.List;
class JsonDataMessage {
long timestamp;
List<JsonAttachment> attachments;
String body;
JsonGroupInfo group;
boolean endSession;
int expiresInSeconds;
boolean profileKeyUpdate;
JsonQuote quote;
List<SharedContact> contacts;
List<JsonPreview> previews;
JsonSticker sticker;
boolean viewOnce;
JsonReaction reaction;
SignalServiceDataMessage.RemoteDelete remoteDelete;
public long timestamp;
public List<JsonAttachment> attachments;
public String body;
public JsonGroupInfo group;
public boolean endSession;
public int expiresInSeconds;
public boolean profileKeyUpdate;
public JsonQuote quote;
public List<SharedContact> contacts;
public List<JsonPreview> previews;
public JsonSticker sticker;
public boolean viewOnce;
public JsonReaction reaction;
public SignalServiceDataMessage.RemoteDelete remoteDelete;
JsonDataMessage(SignalServiceDataMessage dataMessage, String username) throws IOException, NoSuchAccountException {
......
......@@ -30,12 +30,12 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
class JsonGroupInfo {
String groupId;
List<JsonAddress> members;
String name;
String type;
Long avatarId;
public class JsonGroupInfo {
public String groupId;
public List<JsonAddress> members;
public String name;
public String type;
public Long avatarId;
JsonGroupInfo(SignalServiceGroupContext groupContext, String username) throws IOException, NoSuchAccountException {
SignalServiceGroup groupInfo = groupContext.getGroupV1().get();
......
......@@ -24,7 +24,7 @@ import java.util.ArrayList;
import java.util.List;
public class JsonGroupList {
List<JsonGroupInfo> groups = new ArrayList<JsonGroupInfo>();
public List<JsonGroupInfo> groups = new ArrayList<JsonGroupInfo>();
public JsonGroupList(Manager m) {
for(GroupInfo group : m.getGroups()) {
......
......@@ -30,23 +30,23 @@ import java.util.Date;
import java.util.TimeZone;
public class JsonMessageEnvelope {
String username;
String uuid;
JsonAddress source;
int sourceDevice;
String type;
String relay;
long timestamp;
String timestampISO;
long serverTimestamp;
boolean hasLegacyMessage;
boolean hasContent;
boolean isUnidentifiedSender;
JsonDataMessage dataMessage;
JsonSyncMessage syncMessage;
JsonCallMessage callMessage;
JsonReceiptMessage receipt;
JsonTypingMessage typing;
public String username;
public String uuid;
public JsonAddress source;
public int sourceDevice;
public String type;
public String relay;
public long timestamp;
public String timestampISO;
public long serverTimestamp;
public boolean hasLegacyMessage;
public boolean hasContent;
public boolean isUnidentifiedSender;
public JsonDataMessage dataMessage;
public JsonSyncMessage syncMessage;
public JsonCallMessage callMessage;
public JsonReceiptMessage receipt;
public JsonTypingMessage typing;
public JsonMessageEnvelope(SignalServiceEnvelope envelope, SignalServiceContent c, String username) throws IOException, NoSuchAccountException {
......
......@@ -21,12 +21,11 @@ import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
@JsonInclude(Include.NON_NULL)
public
class JsonMessageWrapper {
String id;
String type;
Object data;
Object error;
public class JsonMessageWrapper {
public String id;
public String type;
public Object data;
public Object error;
private JsonMessageWrapper(String type, Object data, Object error, String id) {
this.type = type;
......
......@@ -23,9 +23,9 @@ import org.whispersystems.signalservice.api.messages.SignalServiceDataMessage;
import java.io.IOException;
class JsonPreview {
String url;
String title;
JsonAttachment attachment;
public String url;
public String title;
public JsonAttachment attachment;
JsonPreview(SignalServiceDataMessage.Preview preview, String username) throws IOException, NoSuchAccountException {
url = preview.getUrl();
......
......@@ -21,11 +21,10 @@ import org.whispersystems.signalservice.api.messages.SignalServiceReceiptMessage
import java.util.List;
class JsonReceiptMessage {
String type;
List<Long> timestamps;
long when;
public class JsonReceiptMessage {
public String type;
public List<Long> timestamps;
public long when;
JsonReceiptMessage(SignalServiceReceiptMessage receiptMessage) {
type = receiptMessage.getType().name();
......
......@@ -26,13 +26,13 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
class JsonSentTranscriptMessage {
JsonAddress destination;
long timestamp;
long expirationStartTimestamp;
JsonDataMessage message;
Map<String, Boolean> unidentifiedStatus = new HashMap<>();
boolean isRecipientUpdate;
public class JsonSentTranscriptMessage {
public JsonAddress destination;
public long timestamp;
public long expirationStartTimestamp;
public JsonDataMessage message;
public Map<String, Boolean> unidentifiedStatus = new HashMap<>();
public boolean isRecipientUpdate;
JsonSentTranscriptMessage(SentTranscriptMessage s, String username) throws IOException, NoSuchAccountException {
if(s.getDestination().isPresent()) {
......
......@@ -23,11 +23,11 @@ import org.whispersystems.signalservice.api.messages.SignalServiceDataMessage;
import java.io.IOException;
class JsonSticker {
String packID;
String packKey;
int stickerID;
JsonAttachment attachment;
public class JsonSticker {
public String packID;
public String packKey;
public int stickerID;
public JsonAttachment attachment;
JsonSticker(SignalServiceDataMessage.Sticker sticker, String username) throws IOException, NoSuchAccountException {
packID = Hex.toStringCondensed(sticker.getPackId());
......
......@@ -21,9 +21,9 @@ import org.whispersystems.signalservice.api.messages.multidevice.StickerPackOper
import org.thoughtcrime.securesms.util.Hex;
class JsonStickerPackOperationMessage {
String packID;
String packKey;
String type;
public String packID;
public String packKey;
public String type;
JsonStickerPackOperationMessage(StickerPackOperationMessage message) {
if(message.getPackId().isPresent()) {
......
......@@ -27,20 +27,20 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
class JsonSyncMessage {
JsonSentTranscriptMessage sent;
JsonAttachment contacts;
boolean contactsComplete;
JsonAttachment groups;
JsonBlockedListMessage blockedList;
String request;
List<JsonReadMessage> readMessages;
JsonViewOnceOpenMessage viewOnceOpen;
JsonVerifiedMessage verified;
ConfigurationMessage configuration;
List<JsonStickerPackOperationMessage> stickerPackOperations;
String fetchType;
JsonMessageRequestResponseMessage messageRequestResponse;
public class JsonSyncMessage {
public JsonSentTranscriptMessage sent;
public JsonAttachment contacts;
public boolean contactsComplete;
public JsonAttachment groups;
public JsonBlockedListMessage blockedList;
public String request;
public List<JsonReadMessage> readMessages;
public JsonViewOnceOpenMessage viewOnceOpen;
public JsonVerifiedMessage verified;
public ConfigurationMessage configuration;
public List<JsonStickerPackOperationMessage> stickerPackOperations;
public String fetchType;
public JsonMessageRequestResponseMessage messageRequestResponse;
JsonSyncMessage(SignalServiceSyncMessage syncMessage, String username) throws IOException, NoSuchAccountException {
if(syncMessage.getSent().isPresent()) {
......
......@@ -21,10 +21,10 @@ import org.whispersystems.signalservice.api.messages.SignalServiceTypingMessage;
import org.whispersystems.util.Base64;
class JsonTypingMessage {
String action;
long timestamp;
String groupId;
public class JsonTypingMessage {
public String action;
public long timestamp;
public String groupId;
JsonTypingMessage(SignalServiceTypingMessage typingMessage) {
action = typingMessage.getAction().name();
......
......@@ -21,11 +21,11 @@ import io.finn.signald.clientprotocol.v1.JsonAddress;
import org.whispersystems.signalservice.api.messages.multidevice.VerifiedMessage;
import org.asamk.signal.util.Hex;
class JsonVerifiedMessage {
JsonAddress destination;
String identityKey;
String verified;
long timestamp;
public class JsonVerifiedMessage {
public JsonAddress destination;
public String identityKey;
public String verified;
public long timestamp;
JsonVerifiedMessage(VerifiedMessage message) {
destination = new JsonAddress(message.getDestination());
......
......@@ -20,9 +20,9 @@ package io.finn.signald.clientprotocol.v0;
import io.finn.signald.clientprotocol.v1.JsonAddress;
import org.whispersystems.signalservice.api.messages.multidevice.ViewOnceOpenMessage;
class JsonViewOnceOpenMessage {
JsonAddress sender;
long timestamp;
public class JsonViewOnceOpenMessage {
public JsonAddress sender;
public long timestamp;
JsonViewOnceOpenMessage(ViewOnceOpenMessage message) {
sender = new JsonAddress(message.getSender());
......
......@@ -25,8 +25,8 @@ import java.util.ArrayList;
import java.util.List;
public class JsonBlockedListMessage {
List<JsonAddress> addresses;
List<String> groupIds;
public List<JsonAddress> addresses;
public List<String> groupIds;
public JsonBlockedListMessage(BlockedListMessage blocklist) {
if(!blocklist.getAddresses().isEmpty()) {
addresses = new ArrayList();
......
......@@ -21,9 +21,9 @@ import org.whispersystems.signalservice.api.messages.multidevice.MessageRequestR
import org.whispersystems.util.Base64;
public class JsonMessageRequestResponseMessage {
JsonAddress person;
String groupId;
String type;
public JsonAddress person;
public String groupId;
public String type;
public JsonMessageRequestResponseMessage(MessageRequestResponseMessage m) {
if(m.getPerson().isPresent()) {
......
......@@ -21,10 +21,10 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import org.whispersystems.signalservice.api.messages.SignalServiceDataMessage;
public class JsonReaction {
String emoji;
boolean remove;
JsonAddress targetAuthor;
long targetSentTimestamp;
public String emoji;
public boolean remove;
public JsonAddress targetAuthor;
public long targetSentTimestamp;
public JsonReaction() {}
......
......@@ -20,8 +20,8 @@ package io.finn.signald.clientprotocol.v1;
import org.whispersystems.signalservice.api.messages.multidevice.ReadMessage;
public class JsonReadMessage {
JsonAddress sender;
long timestamp;
public JsonAddress sender;
public long timestamp;
public JsonReadMessage(ReadMessage r) {
sender = new JsonAddress(r.getSender());
......
......@@ -20,11 +20,11 @@ package io.finn.signald.clientprotocol.v1;
import org.whispersystems.signalservice.api.messages.SendMessageResult;
public class JsonSendMessageResult {
JsonAddress address;
SendMessageResult.Success success;
boolean networkFailure;
boolean unregisteredFailure;
String identityFailure;
public JsonAddress address;