blob: 85e780a388607cb60b08c4f8499cfdb689b9b575 [file] [log] [blame]
LemonBoycc766a82022-04-04 15:46:58 +01001#!/usr/bin/env python
2#
3# Server that will accept connections from a Vim channel.
4# Used by test_channel.vim.
5#
6# This requires Python 2.6 or later.
7
8from __future__ import print_function
LemonBoy1b76a8d2022-04-04 21:13:35 +01009from test_channel import ThreadedTCPServer, TestingRequestHandler, \
LemonBoycc766a82022-04-04 15:46:58 +010010 writePortInFile
11import socket
12import threading
13import os
14
15try:
16 FileNotFoundError
17except NameError:
18 # Python 2
19 FileNotFoundError = (IOError, OSError)
20
LemonBoy1b76a8d2022-04-04 21:13:35 +010021if not hasattr(socket, "AF_UNIX"):
22 raise NotImplementedError("Unix sockets are not supported on this platform")
23
LemonBoycc766a82022-04-04 15:46:58 +010024class ThreadedUnixServer(ThreadedTCPServer):
25 address_family = socket.AF_UNIX
26
LemonBoy1b76a8d2022-04-04 21:13:35 +010027class ThreadedUnixRequestHandler(TestingRequestHandler):
28 pass
29
LemonBoycc766a82022-04-04 15:46:58 +010030def main(path):
LemonBoy1b76a8d2022-04-04 21:13:35 +010031 server = ThreadedUnixServer(path, ThreadedUnixRequestHandler)
LemonBoycc766a82022-04-04 15:46:58 +010032
33 # Start a thread with the server. That thread will then start a new thread
34 # for each connection.
35 server_thread = threading.Thread(target=server.serve_forever)
36 server_thread.start()
37
38 # Signal the test harness we're ready, the port value has no meaning.
39 writePortInFile(1234)
40
41 print("Listening on {0}".format(server.server_address))
42
43 # Main thread terminates, but the server continues running
44 # until server.shutdown() is called.
45 try:
46 while server_thread.is_alive():
47 server_thread.join(1)
48 except (KeyboardInterrupt, SystemExit):
49 server.shutdown()
50
51if __name__ == "__main__":
52 try:
53 os.remove("Xtestsocket")
54 except FileNotFoundError:
55 pass
56 main("Xtestsocket")