Skip to content

Commit

Permalink
Bump to 0.2.0, rename entry points and add run state for stability
Browse files Browse the repository at this point in the history
  • Loading branch information
sdb9696 committed Oct 31, 2023
1 parent 87daa6b commit e3cbfda
Show file tree
Hide file tree
Showing 8 changed files with 188 additions and 105 deletions.
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ python::

# Notify the service you're connecting to of your FCM token

pc.connect(YOUR_NOTIFICATION_CALLBACK)
pc.start(YOUR_NOTIFICATION_CALLBACK)


Attribution
Expand Down
4 changes: 2 additions & 2 deletions firebase_messaging/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .fcmpushclient import FcmPushClientConfig, FcmPushClient
from .fcmpushclient import FcmPushClientConfig, FcmPushClient, FcmPushClientRunState

__all__ = ["FcmPushClientConfig", "FcmPushClient"]
__all__ = ["FcmPushClientConfig", "FcmPushClient", "FcmPushClientRunState"]
162 changes: 102 additions & 60 deletions firebase_messaging/fcmpushclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,26 @@ class ErrorType(Enum):
NOTIFY = 4


class FcmPushClientRunState(Enum):
CREATED = (1,)
STARTING_TASKS = (2,)
STARTING_CONNECTION = (3,)
STARTING_LOGIN = (4,)
STARTED = (5,)
RESETTING = (6,)
STOPPING = (7,)
STOPPED = (8,)


@dataclass
class FcmPushClientConfig: # pylint:disable=too-many-instance-attributes
"""Class to provide configuration to
:class:`firebase_messaging.FcmPushClientConfig`.FcmPushClient."""

server_heartbeat_interval: Optional[int] = None
server_heartbeat_interval: Optional[int] = 10
"""Time in seconds to request the server to send heartbeats"""

client_heartbeat_interval: Optional[int] = None
client_heartbeat_interval: Optional[int] = 20
"""Time in seconds to send heartbeats to the server"""

send_selective_acknowledgements: bool = True
Expand Down Expand Up @@ -121,7 +132,6 @@ def __init__(
self.reader = None
self.writer = None
self.do_listen = False
self.logged_in = False
self.sequential_error_counters = {}
self.log_warn_counters = {}

Expand All @@ -132,8 +142,7 @@ def __init__(
self.last_login_time = None
self.last_message_time = None

self.is_resetting = False
self.is_stopping = False
self.run_state: FcmPushClientRunState = FcmPushClientRunState.CREATED
self.tasks = None

self.listen_event_loop = None
Expand Down Expand Up @@ -178,39 +187,36 @@ async def _do_writer_close(self):
_logger.debug("%s Error while trying to close writer", type(e).__name__)

async def _reset(self):
if self.reset_lock.locked() or not self.do_listen:
if (
self.reset_lock.locked()
or self.stopping_lock.locked()
or not self.do_listen
):
return

async with self.reset_lock:
_logger.debug("Resetting connection")
try:
self.is_resetting = True
self.logged_in = False
now = time.time()
time_since_last_login = now - self.last_login_time
if time_since_last_login < self.config.reset_interval:
_logger.debug(
"%ss since last reset attempt.", time_since_last_login
)
await asyncio.sleep(
self.config.reset_interval - time_since_last_login
)

await self._do_writer_close()
self.run_state = FcmPushClientRunState.RESETTING
now = time.time()
time_since_last_login = now - self.last_login_time
if time_since_last_login < self.config.reset_interval:
_logger.debug("%ss since last reset attempt.", time_since_last_login)
await asyncio.sleep(self.config.reset_interval - time_since_last_login)

_logger.debug("Reestablishing connection")
if not await self._connect_with_retry():
_logger.error(
"Unable to connect to MCS endpoint "
+ "after %s tries, shutting down"
)
self._terminate()
return
_logger.debug("Re-connected to ssl socket")
await self._do_writer_close()

await self._login()
finally:
self.is_resetting = False
_logger.debug("Reestablishing connection")
if not await self._connect_with_retry():
_logger.error(
"Unable to connect to MCS endpoint "
+ "after %s tries, shutting down"
)
self._terminate()
return
_logger.debug("Re-connected to ssl socket")

await self._login()

# protobuf variable length integers are encoded in base 128
# each byte contains 7 bits of the integer and the msb is set if there's
Expand Down Expand Up @@ -299,6 +305,8 @@ async def _receive_msg(self):
return payload

async def _login(self):
self.run_state = FcmPushClientRunState.STARTING_LOGIN

now = time.time()
self.input_stream_id = 0
self.last_input_stream_id_reported = -1
Expand Down Expand Up @@ -419,9 +427,9 @@ def _handle_data_message(self, callback, msg, obj):
try:
callback(ret_val, msg.persistent_id, obj)
self._reset_error_count(ErrorType.NOTIFY)
except Exception as ex:
_logger.error(
"Unexpected exception calling notification callback: %s", ex
except Exception:
_logger.exception(
"Unexpected exception calling notification callback\n"
)
self._try_increment_error_count(ErrorType.NOTIFY)

Expand Down Expand Up @@ -480,6 +488,8 @@ async def _send_heartbeat(self):
_logger.debug("Sent heartbeat ping")

def _terminate(self):
self.run_state = FcmPushClientRunState.STOPPING

self.do_listen = False
current_task = asyncio.current_task()
for task in self.tasks:
Expand All @@ -497,19 +507,32 @@ async def _do_monitor(self, callback):
self._terminate()
return

if self.logged_in and self.config.client_heartbeat_interval:
now = time.time()
if (
self.last_message_time + self.config.client_heartbeat_interval < now
and not self.is_resetting
):
await self._send_heartbeat()
await asyncio.sleep(self.config.heartbeat_ack_timeout)
if self.run_state == FcmPushClientRunState.STARTED:
# if server_heartbeat_interval is set and less than
# client_heartbeat_interval then the last_message_time
# will be within the client window if connected
if self.config.client_heartbeat_interval:
now = time.time()
if (
self.last_message_time + self.config.client_heartbeat_interval
< now
and self.do_listen
):
await self._send_heartbeat()
await asyncio.sleep(self.config.heartbeat_ack_timeout)
now = time.time()
if ( # Check state hasn't changed during sleep
self.last_message_time
+ self.config.client_heartbeat_interval
< now
and self.do_listen
and self.run_state == FcmPushClientRunState.STARTED
):
await self._reset()
elif self.config.server_heartbeat_interval:
now = time.time()
if ( # We give the server 2 extra seconds
self.last_message_time + self.config.server_heartbeat_interval
< now - 2
):
await self._reset()

Expand Down Expand Up @@ -554,7 +577,7 @@ async def _handle_message(self, msg, callback, obj):
else:
_logger.info("Succesfully logged in to MCS endpoint")
self._reset_error_count(ErrorType.LOGIN)
self.logged_in = True
self.run_state = FcmPushClientRunState.STARTED
self.persistent_ids = []
return

Expand Down Expand Up @@ -596,6 +619,8 @@ async def _connect(self):
return False

async def _connect_with_retry(self):
self.run_state = FcmPushClientRunState.STARTING_CONNECTION

trycount = 0
connected = False
while (
Expand Down Expand Up @@ -644,12 +669,12 @@ async def _listen(self, callback, obj=None): # pylint: disable=too-many-branche
self._terminate()
return
try:
if self.is_resetting:
if self.run_state == FcmPushClientRunState.RESETTING:
await asyncio.sleep(1)
elif msg := await self._receive_msg():
await self._handle_message(msg, callback, obj)

except OSError as osex:
except (OSError, EOFError) as osex:
if (
isinstance(
osex,
Expand All @@ -660,7 +685,7 @@ async def _listen(self, callback, obj=None): # pylint: disable=too-many-branche
SSLError,
),
)
and self.is_resetting
and self.run_state == FcmPushClientRunState.RESETTING
):
if (
isinstance(osex, SSLError) # pylint: disable=no-member
Expand All @@ -676,7 +701,7 @@ async def _listen(self, callback, obj=None): # pylint: disable=too-many-branche
type(osex).__name__,
)
else:
_logger.error("Unexpected exception during read: %s", osex)
_logger.exception("Unexpected exception during read\n")
if self._try_increment_error_count(ErrorType.CONNECTION):
await self._reset()

Expand All @@ -692,13 +717,11 @@ async def _listen(self, callback, obj=None): # pylint: disable=too-many-branche
finally:
await self._do_writer_close()

def _signal_handler(self):
self.disconnect()

async def _run_tasks(self, callback, obj):
self.reset_lock = asyncio.Lock()
self.stopping_lock = asyncio.Lock()
self.do_listen = True
self.run_state = FcmPushClientRunState.STARTING_TASKS
try:
self.tasks = [
asyncio.create_task(self._listen(callback, obj)),
Expand All @@ -717,6 +740,9 @@ def _start_on_new_loop(self, callback, obj):
asyncio.set_event_loop(self.listen_event_loop)
self.listen_event_loop.run_until_complete(self._run_tasks(callback, obj))

def _start_on_existing_loop(self, callback, obj):
self.listen_event_loop.create_task(self._run_tasks(callback, obj))

@staticmethod
def _wrapped_callback(
fcm_client_loop,
Expand All @@ -732,8 +758,8 @@ def _wrapped_callback(
try:
callback(notification, persistent_id, obj)
fcm_client_loop.call_soon_threadsafe(on_success)
except Exception as ex:
_logger.error("Unexpected exception calling notification callback: %s", ex)
except Exception:
_logger.exception("Unexpected exception calling notification callback\n")
fcm_client_loop.call_soon_threadsafe(on_error)

def checkin(self, sender_id: int, app_id: str) -> str:
Expand Down Expand Up @@ -761,7 +787,7 @@ def checkin(self, sender_id: int, app_id: str) -> str:

return self.credentials["fcm"]["token"]

def connect(
def start(
self,
callback: Optional[Callable[[dict, str, Optional[Any]], None]],
obj: Any = None,
Expand Down Expand Up @@ -792,7 +818,17 @@ def connect(
if self.listen_event_loop:
if not self.callback_event_loop:
self.callback_event_loop = self.listen_event_loop
self.listen_event_loop.create_task(self._run_tasks(callback, obj))
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None

if loop == self.listen_event_loop:
self._start_on_existing_loop(callback, obj)
else:
self.listen_event_loop.call_soon_threadsafe(
self._start_on_existing_loop, callback, obj
)
else:
self.fcm_thread = Thread(
target=self._start_on_new_loop,
Expand All @@ -803,12 +839,15 @@ def connect(
self.fcm_thread.start()

async def _stop_connection(self):
if self.stopping_lock.locked() or self.is_stopping:
if self.stopping_lock.locked() or self.run_state in (
FcmPushClientRunState.STOPPING,
FcmPushClientRunState.STOPPED,
):
return

async with self.stopping_lock:
try:
self.is_stopping = True
self.run_state = FcmPushClientRunState.STOPPING

self.do_listen = False

Expand All @@ -817,11 +856,14 @@ async def _stop_connection(self):
task.cancel()

finally:
self.is_stopping = False
self.run_state = FcmPushClientRunState.STOPPED
self.fcm_thread = None
self.listen_event_loop = None

def disconnect(self):
def is_started(self):
return self.run_state == FcmPushClientRunState.STARTED

def stop(self):
"""Disconnects from FCM and shuts down the service thread."""
if self.fcm_thread:
if (
Expand Down Expand Up @@ -888,4 +930,4 @@ def send_message(self, raw_data, persistent_id):

def __del__(self):
if self.listen_event_loop and self.listen_event_loop.is_running():
self.disconnect()
self.stop()
Loading

0 comments on commit e3cbfda

Please sign in to comment.