diff --git a/Sources/SwiftCentrifuge/Client.swift b/Sources/SwiftCentrifuge/Client.swift index 8753254..403b75e 100644 --- a/Sources/SwiftCentrifuge/Client.swift +++ b/Sources/SwiftCentrifuge/Client.swift @@ -26,7 +26,7 @@ public struct CentrifugeClientConfig { public var maxReconnectDelay = 20.0 public var privateChannelPrefix = "$" public var pingInterval = 25.0 - + public init() {} } @@ -38,15 +38,15 @@ public enum CentrifugeClientStatus { public class CentrifugeClient { public weak var delegate: CentrifugeClientDelegate? - - //MARK - + + // MARK - fileprivate(set) var url: String fileprivate(set) var delegateQueue: OperationQueue fileprivate(set) var syncQueue: DispatchQueue fileprivate(set) var config: CentrifugeClientConfig - - //MARK - - fileprivate(set) var status: CentrifugeClientStatus = .new + + // MARK - + public fileprivate(set) var status: CentrifugeClientStatus = .new fileprivate var conn: WebSocket? fileprivate var token: String? fileprivate var client: String? @@ -62,7 +62,7 @@ public class CentrifugeClient { fileprivate var disconnectOpts: CentrifugeDisconnectOptions? fileprivate var refreshTask: DispatchWorkItem? fileprivate var connecting = false - + /// Initialize client. /// /// - Parameters: @@ -74,10 +74,10 @@ public class CentrifugeClient { self.url = url self.config = config self.delegate = delegate - + let queueID = UUID().uuidString self.syncQueue = DispatchQueue(label: "com.centrifugal.centrifuge-swift.sync<\(queueID)>") - + if let _queue = delegateQueue { self.delegateQueue = _queue } else { @@ -85,7 +85,7 @@ public class CentrifugeClient { self.delegateQueue.maxConcurrentOperationCount = 1 } } - + /** Set connection JWT - parameter token: String @@ -96,7 +96,7 @@ public class CentrifugeClient { strongSelf.token = token } } - + /** Publish message Data to channel - parameter channel: String channel name @@ -118,7 +118,7 @@ public class CentrifugeClient { }) } } - + /** Send raw message to server - parameter data: Data @@ -137,7 +137,7 @@ public class CentrifugeClient { }) } } - + /** Send RPC command - parameter data: Data @@ -162,7 +162,7 @@ public class CentrifugeClient { }) } } - + /** Connect to server */ @@ -195,7 +195,7 @@ public class CentrifugeClient { } catch {} } strongSelf.onClose(serverDisconnect: serverDisconnect) - + } ws.onData = { [weak self] data in guard let strongSelf = self else { return } @@ -205,7 +205,7 @@ public class CentrifugeClient { strongSelf.conn?.connect() } } - + /** Disconnect from server */ @@ -216,7 +216,7 @@ public class CentrifugeClient { strongSelf.close(reason: "clean disconnect", reconnect: false) } } - + /** Create subscription object to specific channel with delegate - parameter channel: String @@ -234,7 +234,7 @@ public class CentrifugeClient { } internal extension CentrifugeClient { - + func refreshWithToken(token: String) { self.syncQueue.async { [weak self] in guard let strongSelf = self else { return } @@ -252,7 +252,7 @@ internal extension CentrifugeClient { }) } } - + func getSubscriptionToken(channel: String, completion: @escaping (String)->()) { self.syncQueue.async { [weak self] in guard let strongSelf = self else { return } @@ -273,7 +273,7 @@ internal extension CentrifugeClient { } } } - + func unsubscribe(sub: CentrifugeSubscription) { let channel = sub.channel self.syncQueue.async { [weak self] in @@ -286,7 +286,7 @@ internal extension CentrifugeClient { } } } - + func resubscribe() { subscriptionsLock.lock() for sub in self.subscriptions { @@ -294,11 +294,11 @@ internal extension CentrifugeClient { } subscriptionsLock.unlock() } - + func subscribe(channel: String, token: String, completion: @escaping (Proto_SubscribeResult?, Error?)->()) { self.sendSubscribe(channel: channel, token: token, completion: completion) } - + func presence(channel: String, completion: @escaping ([String: CentrifugeClientInfo]?, Error?)->()) { self.syncQueue.async { [weak self] in guard let strongSelf = self else { return } @@ -306,7 +306,7 @@ internal extension CentrifugeClient { strongSelf.sendPresence(channel: channel, completion: completion) } } - + func presenceStats(channel: String, completion: @escaping (CentrifugePresenceStats?, Error?)->()) { self.syncQueue.async { [weak self] in guard let strongSelf = self else { return } @@ -314,7 +314,7 @@ internal extension CentrifugeClient { strongSelf.sendPresenceStats(channel: channel, completion: completion) } } - + func history(channel: String, completion: @escaping ([CentrifugePublication]?, Error?)->()) { self.syncQueue.async { [weak self] in guard let strongSelf = self else { return } @@ -322,7 +322,7 @@ internal extension CentrifugeClient { strongSelf.sendHistory(channel: channel, completion: completion) } } - + func close(reason: String, reconnect: Bool) { self.disconnectOpts = CentrifugeDisconnectOptions(reason: reason, reconnect: reconnect) self.conn?.disconnect() @@ -365,7 +365,7 @@ fileprivate extension CentrifugeClient { return } } - + if let result = res { strongSelf.connecting = false strongSelf.status = .connected @@ -388,14 +388,14 @@ fileprivate extension CentrifugeClient { }) } } - + func onData(data: Data) { self.syncQueue.async { [weak self] in guard let strongSelf = self else { return } strongSelf.handleData(data: data as Data) } } - + func onClose(serverDisconnect: CentrifugeDisconnectOptions?) { self.syncQueue.async { [weak self] in guard let strongSelf = self else { return } @@ -410,7 +410,7 @@ fileprivate extension CentrifugeClient { strongSelf.scheduleDisconnect(reason: disconnect.reason, reconnect: disconnect.reconnect) } } - + private func nextCommandId() -> UInt32 { self.commandIdLock.lock() self.commandId += 1 @@ -418,7 +418,7 @@ fileprivate extension CentrifugeClient { self.commandIdLock.unlock() return cid } - + private func newCommand(method: Proto_MethodType, params: Data) -> Proto_Command { var command = Proto_Command() let nextId = self.nextCommandId() @@ -427,7 +427,7 @@ fileprivate extension CentrifugeClient { command.params = params return command } - + private func sendCommand(command: Proto_Command, completion: @escaping (Proto_Reply?, Error?)->()) { self.syncQueue.async { let commands: [Proto_Command] = [command] @@ -441,23 +441,23 @@ fileprivate extension CentrifugeClient { } } } - + private func sendCommandAsync(command: Proto_Command) throws { let commands: [Proto_Command] = [command] let data = try CentrifugeSerializer.serializeCommands(commands: commands) self.conn?.write(data: data) } - + private func waitForReply(id: UInt32, completion: @escaping (Proto_Reply?, Error?)->()) { let timeoutTask = DispatchWorkItem { [weak self] in self?.opCallbacks[id] = nil completion(nil, CentrifugeError.timeout) } self.syncQueue.asyncAfter(deadline: .now() + self.config.timeout, execute: timeoutTask) - + self.opCallbacks[id] = { [weak self] rep in timeoutTask.cancel() - + self?.opCallbacks[id] = nil if let err = rep.error { @@ -467,7 +467,7 @@ fileprivate extension CentrifugeClient { } } } - + private func waitForConnect(completion: @escaping (Error?)->()) { if !self.needReconnect { completion(CentrifugeError.disconnected) @@ -477,21 +477,21 @@ fileprivate extension CentrifugeClient { completion(nil) return } - + let uid = UUID().uuidString - + let timeoutTask = DispatchWorkItem { [weak self] in self?.connectCallbacks[uid] = nil completion(CentrifugeError.timeout) } self.syncQueue.asyncAfter(deadline: .now() + self.config.timeout, execute: timeoutTask) - + self.connectCallbacks[uid] = { error in timeoutTask.cancel() completion(error) } } - + private func scheduleReconnect() { self.syncQueue.async { [weak self] in guard let strongSelf = self else { return } @@ -512,7 +512,7 @@ fileprivate extension CentrifugeClient { }) } } - + private func handleAsyncData(data: Data) throws { let push = try Proto_Push(serializedData: data) let channel = push.channel @@ -573,7 +573,7 @@ fileprivate extension CentrifugeClient { } } } - + private func handleData(data: Data) { guard let replies = try? CentrifugeSerializer.deserializeCommands(data: data) else { return } for reply in replies { @@ -584,7 +584,7 @@ fileprivate extension CentrifugeClient { } } } - + private func startPing() { if self.config.pingInterval == 0 { return @@ -616,11 +616,11 @@ fileprivate extension CentrifugeClient { self.pingTimer?.schedule(deadline: .now() + self.config.pingInterval, repeating: self.config.pingInterval) self.pingTimer?.resume() } - + private func stopPing() { self.pingTimer?.cancel() } - + private func startConnectionRefresh(ttl: UInt32) { let refreshTask = DispatchWorkItem { self.delegateQueue.addOperation { @@ -637,36 +637,36 @@ fileprivate extension CentrifugeClient { self.syncQueue.asyncAfter(deadline: .now() + Double(ttl), execute: refreshTask) self.refreshTask = refreshTask } - + private func stopConnectionRefresh() { self.refreshTask?.cancel() } - + private func scheduleDisconnect(reason: String, reconnect: Bool) { let previousStatus = self.status self.status = .disconnected self.client = nil - + for resolveFunc in self.opCallbacks.values { resolveFunc(CentrifugeResolveData(error: CentrifugeError.disconnected, reply: nil)) } self.opCallbacks.removeAll(keepingCapacity: true) - + for resolveFunc in self.connectCallbacks.values { resolveFunc(CentrifugeError.disconnected) } self.connectCallbacks.removeAll(keepingCapacity: true) - + subscriptionsLock.lock() for sub in self.subscriptions { sub.unsubscribeOnDisconnect() } subscriptionsLock.unlock() - + self.stopPing() - + self.stopConnectionRefresh() - + if previousStatus == .new || previousStatus == .connected { self.delegateQueue.addOperation { [weak self] in guard let strongSelf = self else { return } @@ -676,12 +676,12 @@ fileprivate extension CentrifugeClient { ) } } - + if reconnect { self.scheduleReconnect() } } - + private func sendConnect(completion: @escaping (Proto_ConnectResult?, Error?)->()) { var params = Proto_ConnectRequest() if self.token != nil { @@ -713,7 +713,7 @@ fileprivate extension CentrifugeClient { completion(nil, error) } } - + private func sendRefresh(token: String, completion: @escaping (Proto_RefreshResult?, Error?)->()) { var params = Proto_RefreshRequest() params.token = token @@ -743,7 +743,7 @@ fileprivate extension CentrifugeClient { completion(nil, error) } } - + private func sendUnsubscribe(channel: String, completion: @escaping (Proto_UnsubscribeResult?, Error?)->()) { var params = Proto_UnsubscribeRequest() params.channel = channel @@ -773,7 +773,7 @@ fileprivate extension CentrifugeClient { completion(nil, error) } } - + private func sendSubscribe(channel: String, token: String, completion: @escaping (Proto_SubscribeResult?, Error?)->()) { var params = Proto_SubscribeRequest() params.channel = channel @@ -806,7 +806,7 @@ fileprivate extension CentrifugeClient { completion(nil, error) } } - + private func sendPublish(channel: String, data: Data, completion: @escaping (Proto_PublishResult?, Error?)->()) { var params = Proto_PublishRequest() params.channel = channel @@ -837,7 +837,7 @@ fileprivate extension CentrifugeClient { completion(nil, error) } } - + private func sendHistory(channel: String, completion: @escaping ([CentrifugePublication]?, Error?)->()) { var params = Proto_HistoryRequest() params.channel = channel @@ -871,7 +871,7 @@ fileprivate extension CentrifugeClient { completion(nil, error) } } - + private func sendPresence(channel: String, completion: @escaping ([String:CentrifugeClientInfo]?, Error?)->()) { var params = Proto_PresenceRequest() params.channel = channel @@ -905,7 +905,7 @@ fileprivate extension CentrifugeClient { completion(nil, error) } } - + private func sendPresenceStats(channel: String, completion: @escaping (CentrifugePresenceStats?, Error?)->()) { var params = Proto_PresenceStatsRequest() params.channel = channel @@ -936,7 +936,7 @@ fileprivate extension CentrifugeClient { completion(nil, error) } } - + private func sendRPC(data: Data, completion: @escaping (Proto_RPCResult?, Error?)->()) { var params = Proto_RPCRequest() params.data = data @@ -966,7 +966,7 @@ fileprivate extension CentrifugeClient { completion(nil, error) } } - + private func sendSend(data: Data, completion: @escaping (Error?)->()) { var params = Proto_SendRequest() params.data = data diff --git a/Sources/SwiftCentrifuge/Helpers.swift b/Sources/SwiftCentrifuge/Helpers.swift index 40c2f73..00bd8d2 100644 --- a/Sources/SwiftCentrifuge/Helpers.swift +++ b/Sources/SwiftCentrifuge/Helpers.swift @@ -29,7 +29,7 @@ internal class CentrifugeSerializer { stream.close() return stream.property(forKey: .dataWrittenToMemoryStreamKey) as! Data } - + class func deserializeCommands(data: Data) throws -> [Proto_Reply] { var commands = [Proto_Reply]() let stream = InputStream(data: data as Data) diff --git a/Sources/SwiftCentrifuge/Subscription.swift b/Sources/SwiftCentrifuge/Subscription.swift index 9cdd4be..899b714 100644 --- a/Sources/SwiftCentrifuge/Subscription.swift +++ b/Sources/SwiftCentrifuge/Subscription.swift @@ -16,19 +16,19 @@ public enum CentrifugeSubscriptionStatus { } public class CentrifugeSubscription { - + public let channel: String - + private var status: CentrifugeSubscriptionStatus = .unsubscribed private var isResubscribe = false private var needResubscribe = true - + weak var delegate: CentrifugeSubscriptionDelegate? - + private var callbacks: [String: ((Error?) -> ())] = [:] private let syncQueue: DispatchQueue private weak var centrifuge: CentrifugeClient? - + init(centrifuge: CentrifugeClient, channel: String, delegate: CentrifugeSubscriptionDelegate) { self.centrifuge = centrifuge self.channel = channel @@ -36,7 +36,7 @@ public class CentrifugeSubscription { self.isResubscribe = false self.syncQueue = DispatchQueue(label: "com.centrifugal.centrifuge-swift.sync<\(UUID().uuidString)>") } - + public func subscribe() { self.syncQueue.async { [weak self] in guard @@ -50,7 +50,7 @@ public class CentrifugeSubscription { } } } - + public func publish(data: Data, completion: @escaping (Error?) -> ()) { self.waitForSubscribe(completion: { [weak self, channel = self.channel] error in if let err = error { @@ -60,7 +60,7 @@ public class CentrifugeSubscription { } }) } - + public func presence(completion: @escaping ([String: CentrifugeClientInfo]?, Error?) -> ()) { self.waitForSubscribe(completion: { [weak self, channel = self.channel] error in if let err = error { @@ -70,7 +70,7 @@ public class CentrifugeSubscription { } }) } - + public func presenceStats(completion: @escaping (CentrifugePresenceStats?, Error?) -> ()) { self.waitForSubscribe(completion: { [weak self, channel = self.channel] error in if let err = error { @@ -80,7 +80,7 @@ public class CentrifugeSubscription { } }) } - + public func history(completion: @escaping ([CentrifugePublication]?, Error?) -> ()) { self.waitForSubscribe(completion: { [weak self, channel = self.channel] error in if let err = error { @@ -90,7 +90,7 @@ public class CentrifugeSubscription { } }) } - + func sendSubscribe(channel: String, token: String) { self.centrifuge?.subscribe(channel: self.channel, token: token, completion: { [weak self, weak centrifuge = self.centrifuge] res, error in guard let centrifuge = centrifuge else { return } @@ -110,11 +110,11 @@ public class CentrifugeSubscription { guard let strongSelf = self else { return } strongSelf.delegate?.onSubscribeError(strongSelf, CentrifugeSubscribeErrorEvent(code: code, message: message)) } - + for cb in strongSelf.callbacks.values { cb(CentrifugeError.replyError(code: code, message: message)) } - + strongSelf.callbacks.removeAll(keepingCapacity: true) } case CentrifugeError.timeout: @@ -148,7 +148,7 @@ public class CentrifugeSubscription { } }) } - + func resubscribeIfNecessary() { self.syncQueue.async { [weak self] in guard let strongSelf = self else { return } @@ -158,7 +158,7 @@ public class CentrifugeSubscription { } } } - + func resubscribe() { guard let centrifuge = self.centrifuge else { return } if self.channel.hasPrefix(centrifuge.config.privateChannelPrefix) { @@ -176,38 +176,38 @@ public class CentrifugeSubscription { } } } - + private func waitForSubscribe(completion: @escaping (Error?) -> ()) { self.syncQueue.async { [weak self] in guard let strongSelf = self, let timeout = strongSelf.centrifuge?.config.timeout else { return } - + if !strongSelf.needResubscribe { completion(CentrifugeError.unsubscribed) return } - + let needWait = strongSelf.status == .subscribing || (strongSelf.status == .unsubscribed && strongSelf.needResubscribe) if !needWait { completion(nil) return } - + let uid = UUID().uuidString - + let timeoutTask = DispatchWorkItem { [weak self] in self?.callbacks[uid] = nil completion(CentrifugeError.timeout) } - + strongSelf.callbacks[uid] = { error in timeoutTask.cancel() completion(error) } - + strongSelf.syncQueue.asyncAfter(deadline: .now() + timeout, execute: timeoutTask) } } - + // Access must be serialized from outside. private func moveToUnsubscribed() { if self.status != .subscribeSuccess && self.status != .subscribeError { @@ -226,14 +226,14 @@ public class CentrifugeSubscription { } } } - + func unsubscribeOnDisconnect() { self.syncQueue.sync { [weak self] in guard let strongSelf = self else { return } strongSelf.moveToUnsubscribed() } } - + public func unsubscribe() { self.syncQueue.async { [weak self] in guard let strongSelf = self else { return }