blob: 4836e26ab82d73838ca4c7f9244acb4d1ab0eda7 [file] [log] [blame]
LemonBoycc766a82022-04-04 15:46:58 +01001#!/usr/bin/env python
2#
3# Server that will accept connections from a Vim channel.
4# Used by test_channel.vim.
5#
6# This requires Python 2.6 or later.
7
8from __future__ import print_function
9from test_channel import ThreadedTCPServer, ThreadedTCPRequestHandler, \
10 writePortInFile
11import socket
12import threading
13import os
14
15try:
16 FileNotFoundError
17except NameError:
18 # Python 2
19 FileNotFoundError = (IOError, OSError)
20
21class ThreadedUnixServer(ThreadedTCPServer):
22 address_family = socket.AF_UNIX
23
24def main(path):
25 server = ThreadedUnixServer(path, ThreadedTCPRequestHandler)
26
27 # Start a thread with the server. That thread will then start a new thread
28 # for each connection.
29 server_thread = threading.Thread(target=server.serve_forever)
30 server_thread.start()
31
32 # Signal the test harness we're ready, the port value has no meaning.
33 writePortInFile(1234)
34
35 print("Listening on {0}".format(server.server_address))
36
37 # Main thread terminates, but the server continues running
38 # until server.shutdown() is called.
39 try:
40 while server_thread.is_alive():
41 server_thread.join(1)
42 except (KeyboardInterrupt, SystemExit):
43 server.shutdown()
44
45if __name__ == "__main__":
46 try:
47 os.remove("Xtestsocket")
48 except FileNotFoundError:
49 pass
50 main("Xtestsocket")