You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
instantqt/instantqt/__init__.py

70 lines
2.5 KiB

import os, zmq
def _notify_stream_qt(kernel, stream):
from IPython.external.qt_for_kernel import QtCore
def process_stream_events():
"""fall back to main loop when there's a socket event"""
# call flush to ensure that the stream doesn't lose events
# due to our consuming of the edge-triggered FD
# flush returns the number of events consumed.
# if there were any, wake it up
flush = stream.flush(limit=1)
if flush:
# notifier.setEnabled(False)
kernel.do_one_iteration()
# # process all messages higher priority than shell (control),
# # and at most one shell message per iteration
# priority = 0
# while priority is not None and priority < SHELL_PRIORITY:
# priority = yield kernel.process_one()
# # kernel.app.quit()
fd = stream.getsockopt(zmq.FD)
notifier = QtCore.QSocketNotifier(fd, QtCore.QSocketNotifier.Read, kernel.app)
notifier.activated.connect(process_stream_events)
# remove old getters dont know why
while kernel.msg_queue._getters:
kernel.msg_queue._getters.popleft()
# there may already be unprocessed events waiting.
# these events will not wake zmq's edge-triggered FD
# since edge-triggered notification only occurs on new i/o activity.
# process all the waiting events immediately
# so we start in a clean state ensuring that any new i/o events will notify.
# schedule first call on the eventloop as soon as it's running,
# so we don't block here processing events
timer = QtCore.QTimer(kernel.app)
timer.setSingleShot(True)
timer.timeout.connect(process_stream_events)
timer.start(0)
def _loop_qt(app):
"""Inner-loop for running the Qt eventloop
Pulled from guisupport.start_event_loop in IPython < 5.2,
since IPython 5.2 only checks `get_ipython().active_eventloop` is defined,
rather than if the eventloop is actually running.
"""
app._in_event_loop = True
app.exec_()
app._in_event_loop = False
from ipykernel.eventloops import register_integration
@register_integration('instantqt')
def loop_qt5(kernel):
"""Start a kernel with PyQt5 event loop integration."""
os.environ['QT_API'] = 'pyqt5'
from IPython.lib.guisupport import get_app_qt4
kernel.app = get_app_qt4([" "])
kernel.app.setQuitOnLastWindowClosed(False)
for s in kernel.shell_streams:
_notify_stream_qt(kernel, s)
_loop_qt(kernel.app)