1 # 可重复利用的线程
2 import threading
3 import queue
4 import time
5
6
7 class MyThread(threading.Thread):
8 def __init__(self, *args, **kwargs):
9 super().__init__(*args, **kwargs)
10 self.que = queue.Queue() # 创建一个队列
11
12 def run(self):
13 while True:
14 func, args, kwargs = self.que.get() # 拿到队列函数名
15 func(*args, **kwargs) # 调用函数
16 self.que.task_done() # 让join的计数器减一
17
18 def add_job(self, func, *args, **kwargs):
19 self.que.put((func, args, kwargs)) # 主线程调用 把函数名传入队列
20
21 def join_until_queue_empty(self):
22 self.que.join() # 阻塞
23
24
25 def func_1(name):
26 print("hello", name)
27 time.sleep(3)
28 print("bye", name)
29
30
31 def func_2():
32 print("你好!")
33 time.sleep(3)
34 print("bye")
35
36
37 if __name__ == '__main__':
# 以守护模式运行 运行结束后杀掉线程 不然程序无法退出
38 td = MyThread(daemon=True)
39 td.add_job(func_1, "gkl") 40 td.add_job(func_2) 41 td.start() 42 td.join_until_queue_empty()
# 线程池的实现
1 import threading
2 import queue
3 import time
4
5
6 def task_1():
7 print("hello")
8 time.sleep(2)
9 print("bye")
10
11
12 def task_2():
13 print("你好!")
14 time.sleep(2)
15 print("bye")
16
17
18 class MyThreadPool():
19 def __init__(self, n):
20 self.queue = queue.Queue()
21 for i in range(n):
22 threading.Thread(target=self.run, daemon=True).start()
23
24 def run(self):
25 while True:
26 func = self.queue.get()
27 func()
28 self.queue.task_done()
29
30 def get_task(self, func):
31 self.queue.put(func)
32
33 def thread_join(self):
34 self.queue.join()
35
36
37 if __name__ == '__main__':
38 td = MyThreadPool(4)
39 td.get_task(task_1)
40 td.get_task(task_2)
41 td.thread_join()
![]()
1 # python 自带线程池
2 from multiprocessing.pool import ThreadPool
3 import time
4
5
6 def task_1(name):
7 print("hello", name)
8 time.sleep(2)
9 print("bye")
10
11
12 def task_2():
13 print("你好!")
14 time.sleep(2)
15 print("bye")
16
17
18 if __name__ == '__main__':
19 pool = ThreadPool(4)
20 pool.apply_async(task_1, args=("gkl", ))
21 pool.apply_async(task_2)
22 pool.close()
23 pool.join()
1 # 线程池实现并发服务器
2 from multiprocessing.pool import ThreadPool
3 import socket
4
5
6 class Server(object):
7 def __init__(self):
8 self.server = socket.socket()
9 self.server.bind(("0.0.0.0", 9999))
10 self.server.listen()
11
12 def run(self):
13 pool = ThreadPool(100)
14 while True:
15 client = self.server.accept()
16 pool.apply_async(self.response, args=(client, ))
17
18 def response(self, cli):
19 while True:
20 data = cli[0].recv(1024)
21 if data == b"close":
22 cli[0].close()
23 break
24 else:
25 print("接收来自{}的信息:{}".format(cli[1], data.decode()))
26 cli[0].send(data)
27 print("{}断开连接".format(cli[1]))
28
29
30 if __name__ == '__main__':
31 s = Server()
32 s.run()