logo

Preprost večnitni upravitelj prenosov v Pythonu

A Upravitelj prenosov je v bistvu računalniški program, namenjen prenosu samostojnih datotek iz interneta. Tukaj bomo ustvarili preprost upravitelj prenosov s pomočjo niti v Pythonu. Z uporabo večnitnosti je mogoče datoteko prenesti v obliki kosov hkrati iz različnih niti. Za izvedbo tega bomo ustvarili preprosto orodje ukazne vrstice, ki sprejme URL datoteke in jo nato prenese.

Predpogoji: stroj Windows z nameščenim Pythonom.



Nastavitev

Prenesite spodaj navedene pakete iz ukaznega poziva.

1. Paket Click: Click je paket Python za ustvarjanje čudovitih vmesnikov ukazne vrstice s čim manj kode, kot je potrebno. To je komplet za ustvarjanje vmesnika ukazne vrstice.

pip namestite klik



2. Paket zahtev: V tem orodju bomo prenesli datoteko na podlagi URL-ja (HTTP naslovi). Requests je knjižnica HTTP, napisana v Pythonu, ki vam omogoča pošiljanje zahtev HTTP. S preprostimi slovarji Python lahko dodate glave iz podatkovnih večdelnih datotek in parametrov ter dostopate do odzivnih podatkov na enak način.

zahteve za namestitev pip

3. Paket Threading: Za delo z nitmi potrebujemo paket Threading.



pip namestitev navojev

binarno v bcd

Izvedba

Opomba:

Program je zaradi lažjega razumevanja razdeljen na dele. Prepričajte se, da vam med izvajanjem programa ne manjka noben del kode.

1. korak: uvozite zahtevane pakete

Ti paketi zagotavljajo potrebna orodja, s katerimi lahko spletne zahteve obravnavajo vnose ukazne vrstice in ustvarjajo niti.

Python
import click import requests import threading 

2. korak: Ustvarite funkcijo obravnave

Vsaka nit bo izvedla to funkcijo za prenos določenega dela datoteke. Ta funkcija je odgovorna za zahtevanje samo določenega obsega bajtov in njihovo pisanje na pravilno mesto v datoteki.

Python
def Handler(start end url filename): headers = {'Range': f'bytes={start}-{end}'} r = requests.get(url headers=headers stream=True) with open(filename 'r+b') as fp: fp.seek(start) fp.write(r.content) 

3. korak: Določite glavno funkcijo s klikom

Funkcijo spremeni v pripomoček ukazne vrstice. To določa, kako uporabniki komunicirajo s skriptom iz ukazne vrstice.

Python
#Note: This code will not work on online IDE @click.command(help='Downloads the specified file with given name using multi-threading') @click.option('--number_of_threads' default=4 help='Number of threads to use') @click.option('--name' type=click.Path() help='Name to save the file as (with extension)') @click.argument('url_of_file' type=str) def download_file(url_of_file name number_of_threads): 

4. korak: Nastavite ime datoteke in določite velikost datoteke

Potrebujemo velikost datoteke, da razdelimo prenos med niti in zagotovimo, da strežnik podpira prenose v razponu.

Python
 r = requests.head(url_of_file) file_name = name if name else url_of_file.split('/')[-1] try: file_size = int(r.headers['Content-Length']) except: print('Invalid URL or missing Content-Length header.') return 

5. korak: vnaprej dodelite prostor za datoteke

Vnaprejšnja dodelitev zagotavlja, da je datoteka pravilne velikosti, preden zapišemo kose v določene obsege bajtov.

primerjava javanskih nizov
Python
 part = file_size // number_of_threads with open(file_name 'wb') as fp: fp.write(b'' * file_size) 

6. korak: Ustvarite niti

Niti so dodeljeni določeni obsegi bajtov za vzporedno nalaganje.

Python
 threads = [] for i in range(number_of_threads): start = part * i end = file_size - 1 if i == number_of_threads - 1 else (start + part - 1) t = threading.Thread(target=Handler kwargs={ 'start': start 'end': end 'url': url_of_file 'filename': file_name }) threads.append(t) t.start() 

7. korak: Združite niti

Zagotavlja, da so vse niti dokončane, preden se program konča.

Python
 for t in threads: t.join() print(f'{file_name} downloaded successfully!') if __name__ == '__main__': download_file() 

Koda:

Python
import click import requests import threading def Handler(start end url filename): headers = {'Range': f'bytes={start}-{end}'} r = requests.get(url headers=headers stream=True) with open(filename 'r+b') as fp: fp.seek(start) fp.write(r.content) @click.command(help='Downloads the specified file with given name using multi-threading') @click.option('--number_of_threads' default=4 help='Number of threads to use') @click.option('--name' type=click.Path() help='Name to save the file as (with extension)') @click.argument('url_of_file' type=str) def download_file(url_of_file name number_of_threads): r = requests.head(url_of_file) if name: file_name = name else: file_name = url_of_file.split('/')[-1] try: file_size = int(r.headers['Content-Length']) except: print('Invalid URL or missing Content-Length header.') return part = file_size // number_of_threads with open(file_name 'wb') as fp: fp.write(b'' * file_size) threads = [] for i in range(number_of_threads): start = part * i # Make sure the last part downloads till the end of file end = file_size - 1 if i == number_of_threads - 1 else (start + part - 1) t = threading.Thread(target=Handler kwargs={ 'start': start 'end': end 'url': url_of_file 'filename': file_name }) threads.append(t) t.start() for t in threads: t.join() print(f'{file_name} downloaded successfully!') if __name__ == '__main__': download_file() 


Končali smo z delom kodiranja in zdaj sledite spodnjim ukazom za zagon datoteke .py.

python filename.py –-help

Izhod:

pomoč_izhod' title=python ime datoteke.py –-pomoč


Ta ukaz prikazuje uporabo ukaznega orodja za klik in možnosti, ki jih orodje lahko sprejme. Spodaj je vzorčni ukaz, pri katerem poskušamo prenesti slikovno datoteko jpg z URL-ja in podamo tudi ime in število niti.

Posnetek zaslona-2025-04-07-155058' loading='lazy' title=vzorec ukaza za prenos jpg

Ko vse uspešno zaženete, boste lahko videli svojo datoteko (v tem primeru flower.webp) v imeniku mape, kot je prikazano spodaj:

Posnetek zaslona-2025-04-07-155750' loading='lazy' title=imenik

Končno smo s tem uspešno končali in to je eden od načinov za izdelavo preprostega večnitnega upravitelja prenosov v Pythonu.