想提高計算速度?作爲數據科學家你應該知道這些 python 多線程、進程知識

 2019-09-18 19:28:00.0

想提高計算速度?作爲數據科學家你應該知道這些 python 多線程、進程知識

每個數據科學項目遲早都會面臨一個不可避免的挑戰:速度問題。使用更大的數據集會導致處理速度變慢,因此最終必須想辦法優化算法的運行時間。正如你們大多數人已經知道的,並行化是這種優化的必要步驟。python 爲並行化提供了兩個內置庫:多處理和線程。在這篇文章中,我們將探討數據科學家如何在兩者之間進行選擇,以及在這樣做時應注意哪些因素。

並行計算與數據科學

衆所周知,數據科學是處理大量數據並從中提取有用見解的科學。通常情況下,我們對數據執行的操作很容易並行化,這意味着不同的處理代理可以一次對數據執行一個操作,最後進行組合以獲得完整的結果。

爲了更好地解釋並行性,讓我們拿一個真實世界的例子作爲類比。假設你需要打掃你家的三個房間。你可以自己打掃,打掃完一個再打掃另一個,也可以讓你的兩個兄弟姐妹幫你打掃,每個人打掃一個房間。在後一種方法中,每個人完成整個任務的一部分,從而減少了完成任務所需的總時間。這就是實際中的並行性。

並行處理可以用 python 以兩種不同的方式實現:多處理和線程。

多處理與線程:理論

基本上,多處理和線程是實現並行計算的兩種方法,分別使用進程和線程作爲處理代理。爲了理解它們的工作原理,我們必須搞清楚什麼是進程和線程。

想提高計算速度?作爲數據科學家你應該知道這些 python 多線程、進程知識

進程

進程是正在執行的計算機程序的實例。每個進程都有自己的內存空間,用來存儲正在運行的指令,以及需要存儲和訪問才能執行的任何數據。

線程

線程是進程的組件,可以並行運行。一個進程中可以有多個線程,它們共享相同的內存空間,即父進程的內存空間。這意味着要執行的代碼以及程序中聲明的所有變量將由所有線程共享。

想提高計算速度?作爲數據科學家你應該知道這些 python 多線程、進程知識

例如,讓我們回想一下正在你的計算機上運行的程序。你可能正在瀏覽器中閱讀本文,瀏覽器可能打開了多個選項卡。你也可以同時通過 Spotify 桌面應用程序收聽音樂。瀏覽器和 spotify 應用程序是不同的進程;每個進程都可以使用多個進程或線程來實現並行性。瀏覽器中的不同選項卡可能在不同的線程中運行。Spotify 可以在一個線程中播放音樂,在另一個線程中從 Internet 下載音樂,並使用第三個線程顯示圖形用戶界面。這稱爲多線程。對多個進程進行多處理也可以做到這一點。事實上,像 chrome 和 firefox 這樣的大多數現代瀏覽器使用多處理,而不是多線程來處理多個選項卡。

技術細節

一個進程的所有線程都存在於同一個內存空間中,而進程有各自的內存空間。

與進程相比,線程更輕量級,開銷更低。生成進程比生成線程慢一點。

在線程之間共享對象更容易,因爲它們共享相同的內存空間。爲了實現同一個進程間通信,我們必須使用某種 IPC (inter-process communication) 模型,它通常由 OS 提供。

並行計算的陷阱

將並行性引入程序並不總是一個正和博弈,也有一些陷阱需要注意。其中,最重要的是下面的這些問題。

  • 競爭條件:正如我們已經討論過的,線程有一個共享內存空間,因此它們可以訪問共享變量。當多個線程試圖同時更改同一個變量時,會出現競爭條件。線程調度程序可以在線程之間任意交換,因此我們無法知道線程嘗試更改數據的順序。這可能會導致兩個線程中的任何一個出現不正確的行爲,特別是當線程決定基於變量的值執行某些操作時。爲了防止這種情況發生,可以在修改變量的代碼段周圍放置互斥鎖,以便一次只能有一個線程寫入變量。

  • 飢餓:當一個線程在較長時間內被拒絕訪問某個特定的資源時,就會發生飢餓,在這種情況下,整個程序的速度會減慢。這可能是由於線程調度算法設計不當而產生的意外副作用。

  • 死鎖:過度使用互斥鎖也有一個缺點——它會在程序中引入死鎖。死鎖是一個線程等待另一個線程釋放鎖時的狀態,但另一個線程需要一個資源來完成第一個線程保持的操作。這樣,兩個線程都會停止,程序也會停止。死鎖可以被認爲是飢餓的極端情況。爲了避免這種情況,我們必須小心不要引入太多相互依賴的鎖。

  • 活鎖:活鎖是指線程在循環中繼續運行,但沒有任何進展。這也是由於互斥鎖設計不當和使用不當造成的。

python 中的多處理和線程

全局解釋器鎖

說到 python,有一些奇怪的地方需要記住。我們知道線程共享相同的內存空間,因此必須採取特殊的預防措施,以便兩個線程不會寫入相同的內存位置。CPython 解釋器使用名爲 GIL 的機制或全局解釋器鎖來處理這個問題。

python wiki 上面的資料:

在 CPython 中,全局解釋器鎖(GIL)是一個互斥鎖,它保護對 python 對象的訪問過程,防止多個線程同時執行 python 字節碼。這個鎖是必要的,這主要是因爲 CPython 的內存管理不是線程安全的。

瞭解 python GIL 的詳細信息,請查看:https://www.dabeaz.com/python/UnderstandingGIL.pdf

GIL 完成了任務,但付出了代價。它在解釋器級別上有效地序列化指令。其工作原理如下:任何線程要執行任何函數,都必須獲取全局鎖。一次只有一個線程可以獲取該鎖,這意味着解釋器最終會以串行方式運行指令。這種設計使得內存管理線程安全,但結果是,它根本不能利用多個 cpu 內核。在單核 cpu 中,這不是什麼大問題。但是如果你使用多核 cpu,這個全局鎖最終會成爲一個瓶頸。

但是,如果你的程序在其他地方(例如在網絡、IO 或用戶交互中)有更嚴重的瓶頸,則此瓶頸將變得無關緊要。在這些情況下,線程是一種完全有效的並行化方法。但對於 CPU 受限的程序,線程最終會使程序變慢。讓我們通過一些示例用例來探討這個問題。

線程的使用案例

GUI 程序始終使用線程來使應用程序響應。例如,在文本編輯程序中,一個線程負責記錄用戶輸入,另一個線程負責顯示文本,第三個線程負責拼寫檢查,等等。在這裏,程序必須等待用戶交互,這是最大的瓶頸。使用多處理不會使程序更快。

線程的另一個用例是 io 綁定或網絡綁定的程序,例如 web-scrapers。在這種情況下,多個線程可以同時處理多個網頁的刮擦。線程必須從 Internet 下載網頁,這將是最大的瓶頸,因此線程是一個完美的解決方案。Web 服務器是受網絡約束的,工作原理與此類似;有了它們,多處理就沒有線程的優勢了。另一個相關的例子是 tensorflow,它使用線程池並行地轉換數據。

多處理的使用案例

如果程序是 CPU 密集型的,並且不需要進行任何 IO 或用戶交互,那麼多處理就比線程更加突出。例如,任何一個只處理數字的程序都可以使用多處理得到極大的加速;事實上,線程可能會減慢它的速度。一個有趣的實際例子是 Pytorch Dataloader,它使用多個子進程將數據加載到 GPU 中。

python 中的並行化

python 爲同名的並行化方法提供了兩個庫——多處理和線程。儘管它們之間有着根本的區別,但這兩個庫提供了非常相似的 API(從 python 3.7 開始)。讓我們來具體看看吧。

import threading

import random

from functools import reduce


def func(number):
   random_list = random.sample(range(1000000), number)
   return reduce(lambda x, y: x*y, random_list)

number = 50000

thread1 = threading.Thread(target=func, args=(number,))

thread2 = threading.Thread(target=func, args=(number,))


thread1.start()

thread2.start()


thread1.join()

thread2.join()

你可以看到,我創建了一個函數 func,它創建一個隨機數列表,然後按順序將其所有元素相乘。如果物品數量足夠大,比如說 5 萬或 10 萬件,這可能是一個相當繁重的過程。

然後,我創建了兩個線程來執行同一個函數。線程對象有一個異步啓動線程的 start 方法。如果我們想等待它們終止並返回,我們必須調用 join 方法,這就是我們在上面所做的。

如你所見,在後臺將新線程轉到任務的 API 非常簡單。最棒的是,用於多處理的 API 也幾乎完全相同;讓我們來檢查一下吧~

import multiprocessing

import randomfrom functools

import reduce


def func(number):
   random_list = random.sample(range(1000000), number)
   return reduce(lambda x, y: x*y, random_list)

number = 50000

process1 = multiprocessing.Process(target=func, args=(number,))

process2 = multiprocessing.Process(target=func, args=(number,))


process1.start()

process2.start()


process1.join()

process2.join()

在這裏它只是交換線程。有着多處理的線程。

顯然,你可以用它做很多事情,但這不在本文的範圍內,所以我們不在這裏討論。如果你有興趣瞭解更多信息,請查看這裏和這裏的文檔:https://docs.python.org/3/library/threading.html 和 https://docs.python.org/3/library/threading.html   。

基準點

現在我們已經瞭解了實現並行化的代碼是什麼樣子的,讓我們回到性能問題上來。如前所述,線程不適合用於 CPU 限制的任務;在這些情況下,它最終成爲一個瓶頸。我們可以使用一些簡單的基準來驗證這一點。

首先,讓我們看看在我上面展示的代碼示例中,線程處理與多處理是如何比較的。請記住,此任務不涉及任何類型的 IO,因此它是純 CPU 綁定的任務。

想提高計算速度?作爲數據科學家你應該知道這些 python 多線程、進程知識

讓我們看看一個 IO 綁定任務的類似基準。例如,以下函數:

import requestsdef func(number):
   url = 'http://example.com/'
   for i in range(number):
       response = requests.get(url)
       with open('example.com.txt', 'w') as output:
           output.write(response.text)

這個函數只是獲取一個網頁並將其保存到一個本地文件中,循環多次。無用但直截了當,因此很適合演示。讓我們看看基準是什麼吧。

想提高計算速度?作爲數據科學家你應該知道這些 python 多線程、進程知識

現在,從這兩張圖表中可以注意到以下幾點:

  • 在這兩種情況下,單個進程的執行時間都比單個線程長。顯然,進程比線程有更多的開銷。

  • 對於受 CPU 限制的任務,多個進程的性能比多個線程要好。然而,當我們使用 8x 並行化時,這種差異就變得不那麼明顯了。由於我的筆記本電腦中的處理器是四核的,因此最多有四個進程可以有效地使用多核。所以當我使用更多的進程時,它的伸縮性就不好。但是,它仍然比線程性能好很多,因爲線程根本不能利用多個核。

  • 對於 IO 綁定的任務,瓶頸不是 CPU。因此,GIL 帶來的通常限制在這裏不適用,多處理也沒有優勢。不僅如此,線程的輕量級開銷實際上使它們比多處理更快,並且線程始終優於多處理。

差異、優缺點

  • 線程在相同的內存空間中運行;進程有單獨的內存。

  • 從前面的觀點來看:在線程之間共享對象更容易,但與此同時,你必須採取額外的措施來實現對象同步,以確保兩個線程不會同時寫入同一個對象,並且不會出現爭用情況。

  • 由於對象同步增加了編程開銷,多線程編程更容易出現錯誤。另一方面,多進程編程很容易實現。

  • 與進程相比,線程的開銷更低;生成進程比線程花費更多的時間。

  • 由於 python 中 GIL 的侷限性,線程不能利用多個 CPU 覈實現真正的並行。多處理沒有任何這樣的限制。

  • 進程調度由操作系統處理,而線程調度則由 python 解釋器完成。

  • 子進程是可中斷和可終止的,而子線程不是。你必須等待線程終止或加入。

從所有這些討論中,我們可以得出以下結論:

  • 線程應該用於涉及 IO 或用戶交互的程序。

    • 多處理應該用於 CPU 受限、計算密集型的程序。

從數據科學家的角度

典型的數據處理管道可分爲以下步驟:

  1. 讀取原始數據並存儲到主存儲器或 GPU 中;

  2. 使用 CPU 或 GPU 進行計算;

  3. 將挖掘出的信息存儲在數據庫或磁盤中。

讓我們來探索如何在這些任務中引入並行性,從而加快它們的速度。

步驟 1 包括了從磁盤讀取數據,因此很明顯磁盤 IO 將成爲此步驟的瓶頸。正如我們所討論的,線程是並行這種操作的最佳選擇。同樣,步驟 3 也是引入線程的理想候選步驟。

但是,步驟 2 包含涉及 CPU 或 GPU 的計算。如果是基於 CPU 的任務,那麼使用線程將毫無用處;相反,我們必須進行多處理。只有這樣,我們才能利用 CPU 的多個核並實現並行性。如果這是一個基於 GPU 的任務,因爲 GPU 已經在硬件級別實現了一個大規模並行化的體系結構,那麼使用正確的接口(庫和驅動程序)與 GPU 交互應該可以處理剩下的事情。

想提高計算速度?作爲數據科學家你應該知道這些 python 多線程、進程知識

現在你可能會想,「我的數據管道看起來與此有些不同;我有一些任務並不真正適合這個通用框架。」不過,在這裏你應該考慮的因素是:

  • 你的任務是否有任何形式的 IO

  • IO 是否是程序的瓶頸

  • 你的任務是否取決於 CPU 的大量計算

考慮到這些因素,再加上上面的要點,你應該能夠做出決定。另外,請記住,你不必在整個程序中使用單一形式的並行,而是應該在程序的不同部分使用不同的並行。

現在我們來看看數據科學家可能面臨的兩個常見場景,以及如何使用並行計算來加速它們。

場景 1:下載電子郵件

假設你想分析自己創業公司收件箱中的所有電子郵件,並瞭解其趨勢:誰是最頻繁的發件人,電子郵件中出現的最常見關鍵字是什麼,一週中的哪一天或一天中的哪一小時收到的電子郵件最多,等等。當然,這個項目的第一步是將電子郵件下載到你的計算機上。

首先,讓我們按順序進行,而不使用任何並行化。下面是要使用的代碼,應該非常簡單明瞭。有一個下載電子郵件的功能,它以電子郵件 ID 列表作爲輸入,並按順序下載它們。這個函數一次調用 100 個電子郵件的 ID 列表。

import imaplib

import time

IMAP_SERVER = 'imap.gmail.com'

USERNAME = 'username@gmail.com'

PASSWORD = 'password'


def download_emails(ids):
   client = imaplib.IMAP4_SSL(IMAP_SERVER)
   client.login(USERNAME, PASSWORD)
   client.select()
   for i in ids:
       print(f'Downloading mail id: {i.decode()}')
       _, data = client.fetch(i, '(RFC822)')
       with open(f'emails/{i.decode()}.eml', 'wb') as f:
           f.write(data[0][1])
   client.close()
   print(f'Downloaded {len(ids)} mails!')


start = time.time()


client = imaplib.IMAP4_SSL(IMAP_SERVER)

client.login(USERNAME, PASSWORD)

client.select()

_, ids = client.search(None, 'ALL')

ids = ids[0].split()

ids = ids[:100]

client.close()


download_emails(ids)

print('Time:', time.time() - start)

所用時間:35.65300488471985 秒。

現在讓我們在這個任務中引入一些並行性來加快速度。在開始編寫代碼之前,我們必須在線程和多處理之間做出決定。正如你目前所瞭解到的,當任務的瓶頸是 IO 時,線程是最好的選擇。這裏的任務顯然屬於這一類,因爲它正在通過 Internet 訪問 IMAP 服務器。所以我們要開始使用線程了。

我們將要使用的大部分代碼將與我們在順序案例中使用的代碼相同。唯一不同的是,我們將把 100 個電子郵件 ID 的列表分成 10 個較小的塊,每個塊包含 10 個 ID,然後創建 10 個線程,並使用每個線程的不同塊調用 download_emails 函數。我正在使用 python 標準庫中的 concurrent.futures.threadpoolexecutor 類進行線程處理。

import imaplib

import time

from concurrent.futures import ThreadPoolExecutor

IMAP_SERVER = 'imap.gmail.com'

USERNAME = 'username@gmail.com'

PASSWORD = 'password'


def download_emails(ids):
   client = imaplib.IMAP4_SSL(IMAP_SERVER)
   client.login(USERNAME, PASSWORD)
   client.select()
   for i in ids:
       print(f'Downloading mail id: {i.decode()}')
       _, data = client.fetch(i, '(RFC822)')
       with open(f'emails/{i.decode()}.eml', 'wb') as f:
           f.write(data[0][1])
   client.close()


start = time.time()


client = imaplib.IMAP4_SSL(IMAP_SERVER)

client.login(USERNAME, PASSWORD)

client.select()

_, ids = client.search(None, 'ALL')

ids = ids[0].split()

ids = ids[:100]

client.close()


number_of_chunks = 10

chunk_size = 10

executor = ThreadPoolExecutor(max_workers=number_of_chunks)

futures = []

for i in range(number_of_chunks):
   chunk = ids[i*chunk_size:(i+1)*chunk_size]
   futures.append(executor.submit(download_emails, chunk))


for future in concurrent.futures.as_completed(futures):
   pass

print('Time:', time.time() - start)

所用時間:9.841094255447388 秒。

如你所見,線程大大加快了它的速度。

場景 2:使用 scikit learn 進行分類

假設你有一個分類問題,你想使用一個隨機森林分類器。由於這是一種標準的、衆所周知的機器學習算法,我們不需要重新發明輪子,而只需使用 RandomForestClassifier 即可。

以下代碼用於演示。我使用助手函數 sklearn.datasets.make_classification 創建了一個分類數據集,然後在此基礎上訓練了一個 RandomForestClassifier。另外,我正在計時代碼中完成模型擬合核心工作的部分。

from sklearn.ensemble import RandomForestClassifier

from sklearn import datasets

import time

X, y = datasets.make_classification(n_samples=10000, n_features=50, n_informative=20, n_classes=10)


start = time.time()

model = RandomForestClassifier(n_estimators=500)

model.fit(X, y)

print('Time:', time.time()-start)

任務花費時間:34.17733192443848 秒。

現在我們將研究如何減少該算法的運行時間。我們知道這個算法可以在一定程度上並行化,但是什麼樣的並行化纔是合適的呢?它沒有任何 IO 瓶頸,相反,它是一個非常 CPU 密集型的任務。所以多處理是合乎邏輯的選擇。

幸運的是,sklearn 已經在這個算法中實現了多處理,我們不必從頭開始編寫它。正如你在下面的代碼中看到的,我們只需要提供一個參數 n_jobs(它應該使用的進程數)來啓用多處理。

from sklearn.ensemble import RandomForestClassifier

from sklearn import datasets

import time

X, y = datasets.make_classification(n_samples=10000, n_features=50, n_informative=20, n_classes=10)


start = time.time()

model = RandomForestClassifier(n_estimators=500, n_jobs=4)

model.fit(X, y)

print('Time:', time.time()-start)

所用時間:14.576200723648071 秒。

正如預期的那樣,多處理使其速度更快。

結論

大多數(如果不是所有的)數據科學項目將會發現並行計算能大幅提高計算速度。事實上,許多流行的數據科學庫已經內置了並行性,你只需啓用它即可。因此,在嘗試自己實現它之前,請查看正在使用的庫的文檔,並檢查它是否支持並行性。如果沒有,本文將幫助你自己實現它。

via:https://blog.floydhub.com/multiprocessing-vs-threading-in-python-what-every-data-scientist-needs-to-know/


文章來源:雷鋒網