Python
- Abstract Base Classes
- Coroutine examples
- Cheat Sheet: (pdf, html, source)
Hello Summary
CONSTANT=10
# recursive, exception, conditions, lambda
def fib(n):
if n < 0:
raise Exception('n={}, should be positive'.format(n))
if n == 0:
return 0
check_n = lambda n: n == 1 or n == 2
if check_n(n):
return 1
return fib(n-1) + fib(n-2)
# generator
def gen(n):
yield 1
for i in range(5, n):
yield n
# class, attribute, method
class math:
def __init__(self, c=CONSTANT):
self.c = c
@staticmethod
def addition(a, b=2):
return a+b
def times_ten_and_hundred(n):
return n*self.c, n*self.c**c
if __name__ == '__main__':
# variables
a, b = 1, 3
# list/array(l), tuple(t), dict(d)
l, t, d= [a, b], (a, b), {'a': a, 'b': b}
# generator
g = gen(6)
# next(g) is equivalent to call g.__next__()
print(next(g)) # 1
print(next(g)) # 5
print(next(g)) # StopIteration
try:
fib(-1)
except Exception as e:
print("error: {}".format(e)) # error: n=-1, should be positive
print(math.addition(a, b=b)) # 4
print(math.addition(d['a'])) # 3
# instance
r1, r2 = math().times_ten_and_hundred(5)
assert r1 == 50 # ok
assert r2 == 500 # ok
Data Structure
List
>>> a = []
>>> a.append(1)
>>> a.append(2)
>>> a.append(3)
>>> a.pop() # stack <=> lifo
3
>>> a.pop(0) # queue <=> fifo (if implemented on linkedlist, complexity O(1) to add/del element at beginning)
1
>>> a
[2]
Note: a list might have different type of elements (int, dict, ...), while import array
cannot.
Linked List
See linked list
Sorting algorithms
See sorting
Exceptions
class Custom(Exception):
pass
try:
print(does_not_exists)
except Exception as e:
raise Custom("Your variable does not exists") from e
Typing
from __future__ import annotations
def concat(a: list | str, b: list | str):
return a + b
>>> concat("hello", " world")
'hello world'
>>> concat([1, 2], [3, 4])
[1, 2, 3, 4]
See pep-0604
Concurrency Compute (thread/process)
Example 1 (customed)
from concurrent import futures
import time
def task(seq):
print('Start:', time.strftime('%X'))
time.sleep(max(seq))
print('End:', time.strftime('%X'))
return list(reversed(seq))
def main():
max_workers = 2
timeout = 300
jobs = []
not_done = set()
pendings = 0
with futures.ProcessPoolExecutor(max_workers=max_workers) as executor: # you can also use futures.ThreadPoolExecutor
for seq in [[3,2,1], [10,5,0]]:
if pendings > max_workers:
_u, not_done = futures.wait(not_done, timeout, return_when=futures.FIRST_COMPLETED)
pendings = len(not_done)
print(f"{pendings} jobs pending")
job = executor.submit(task, seq)
jobs.append(job)
not_done.add(job)
pendings += 1
done, not_done = futures.wait(jobs, timeout, return_when=futures.ALL_COMPLETED)
results = [d.result() for d in done]
return results
>>> r = main()
Start: 15:08:39
End: 15:08:49
Start: 15:08:49
End: 15:08:52
>>> r
[[1, 2, 3], [0, 5, 10]]
Example 2 (shorten)
from concurrent import futures
import time
def task(seq):
print('Start:', time.strftime('%X'))
time.sleep(max(seq))
print('End:', time.strftime('%X'))
return list(reversed(seq))
def main():
results = []
with futures.ThreadPoolExecutor(max_workers=2) as executor:
tasks = {executor.submit(task, seq=seq) for seq in [[3,2,1],[10,5,0]]}
for completed_task in futures.as_completed(tasks, timeout=60):
results.append(completed_task.result())
return results
>>> r=main()
Start: 12:30:36
Start: 12:30:36
End: 12:30:39
End: 12:30:46
>>> r
[[1, 2, 3], [0, 5, 10]]
Concurrency IO (Async)
Useful when you write/read on disk/network devices (e.g. tcp connections)
Read also:
- official doc for asyncio
- gather(), wait(), and TaskGroup()
- How does asyncio work?
- How to speed up async requests in Python
Classic usage
import asyncio
import time
async def coroutine(seq):
await asyncio.sleep(max(seq))
return list(reversed(seq))
async def main():
t = asyncio.create_task(coroutine([3, 2, 1]))
t2 = asyncio.create_task(coroutine([10, 5, 0])) # Python 3.7+
print('Start:', time.strftime('%X'))
a1 = await asyncio.gather(t, t2)
print('End:', time.strftime('%X')) # Should be 10 seconds
print(f'Both tasks done: {all((t.done(), t2.done()))}')
print('Start:', time.strftime('%X'))
a2 = await asyncio.gather(coroutine([1,2,3]), coroutine([1,2,2]))
print('End:', time.strftime('%X')) # Should be 3 seconds
return a1, a2
Note: don't use time.sleep()
>>> asyncio.run(main())
Start: 15:08:39
End: 15:08:49
Both tasks done: True
Start: 15:08:49
End: 15:08:52
>>> a1
[[1, 2, 3], [0, 5, 10]]
Run lot of asynchronous functions with eventloop
import asyncio
import time
loop = asyncio.new_event_loop() # or loop = asyncio.get_running_loop()
# by default it will create threads, but you can create processes by changing:
# import concurrent.futures
# loop.set_default_executor(concurrent.futures.ProcessPoolExecutor())
# you can also use:
# with concurrent.futures.ProcessPoolExecutor() as pool_executor:
# loop.run_in_executor(pool_executor, func, params)
async def send_request(id):
while not loop.is_closed():
print(time.strftime('%X'), ' Hello', id)
await asyncio.sleep(id) # non-blocking, so other threads can take the lead, see https://stackoverflow.com/questions/56729764/asyncio-sleep-vs-time-sleep
print(time.strftime('%X'), ' Done', id)
# it is possible to remove the while not loop... by doing so, but it is more costly (destroy/recreate thread), moreover it might be dangerous that a dying thread create a new one ?:
# loop.create_task(send_request(id))
for i in range(1,32):
loop.create_task(send_request(i))
try:
loop.run_forever()
except KeyboardInterrupt:
[task.cancel() for task in asyncio.all_tasks(loop)]
finally:
loop.close()
Result:
11:40:26 Hello 1
11:40:26 Hello 2
11:40:26 Hello 3
11:40:26 Hello 4
11:40:26 Hello 5
11:40:26 Hello 6
...
11:40:26 Hello 30
11:40:26 Hello 31
11:40:27 Done 1
11:40:27 Hello 1
11:40:28 Done 2
11:40:28 Done 1
11:40:28 Hello 2
11:40:28 Hello 1
11:40:29 Done 3
11:40:29 Hello 3
...
Run synchronous functions in different threads
In case you can't touch a function (signature, content), but you want to run it in a thread or process to continue other tasks in parallel. Careful, if something is blocking in the function, you can't touch it, it will monopolize the thread. Might be problematic if you have more calls than available threads.
import asyncio
import time
import concurrent
def synchronous(i):
print(time.strftime('%X'), ' Hello', i)
time.sleep(i) # simulate work that cant be predictable and/or changed
print(time.strftime('%X'), ' Done', i)
async def sync_wrapper(i):
loop = asyncio.get_running_loop()
while True: # optional: in this example we consider each sync fonction is recalled
# NOTE it seems to work as well if executor=None, however
# i'm not sure if it really spawn a thread with executor=None
with concurrent.futures.ThreadPoolExecutor() as executor:
await loop.run_in_executor(executor, synchronous, i)
loop = asyncio.new_event_loop() # or loop = asyncio.get_running_loop()
for i in range(1,4):
loop.create_task(sync_wrapper(i))
try:
loop.run_forever()
finally:
loop.close()
Result:
19:01:23 Hello 1
19:01:23 Hello 2
19:01:23 Hello 3
19:01:24 Done 1
19:01:24 Hello 1
19:01:25 Done 2
19:01:25 Hello 2
19:01:25 Done 1
19:01:25 Hello 1
19:01:26 Done 3
19:01:26 Hello 3
19:01:26 Done 1
19:01:26 Hello 1
19:01:27 Done 2
19:01:27 Hello 2
19:01:27 Done 1
Alternative with python 3.9
asyncio.to_thread(blocking)
Coupled with generators
# generator async
async def coroutine_generator(n):
for i in range(n):
yield i*i
# usage of generator async
async def print_gen(n):
# generator coroutine will use `__aiter__() and __anext__()`
async for i in coroutine_generator(n):
print(i)
await print_gen(5) # 0 1 4 9 16
# alternatively you can do: asyncio.run(print_gen(5))
See:
Libraries
Machine Learning
- tslearn: time series analysis
- scikit-learn: classical machine learning
- sktime: time series model (classification, clustering) compatible with scikit-learn interfaces
- kats: a kit to analyze time series data, key statistics and characteristics, detecting change points and anomalies, to forecasting future trends.
- tsfresh: time series feature extraction
- DESlib: easy-to-use ensemble learning library focused on the implementation of the state-of-the-art techniques for dynamic classifier and ensemble selection
- PyOD: outlier detection
- river: online machine learning with streaming data
- unionml: deploy machine learning microservices
- Flyte: workflow automation platform for complex, mission-critical data, and ML processes at scale
- ALEPython: Accumulated Local Effects (or ALE) interepretability
Statistics
- Pingouin: ANOVA, post-hocs (parametric and non-parametric), multivariate
- statsmodels (see also scikit posthocs)
Misc
- modin: scale pandas (using dask or ray)
- polars: alternative to pandas written in rust
- streamlit: create web apps for machine learning projects
- luma.lcd: display drivers for HD44780, PCD8544, ST7735, ST7789, ST7567, HT1621, UC1701X, ILI9341
- ruptures: time series change point detection in Python
- SymPy: algebra system
- Adafruit CircuitPython RFM69: CircuitPython RFM69 packet radio module. This supports basic RadioHead-compatible sending and receiving of packets with RFM69 series radios (433/915Mhz).
- aiohttp: Asynchronous HTTP client/server framework for asyncio (core dev of cython)
- aiocache: Asyncio cache manager for redis, memcached and memory
Numpy
>>> a
array([0.34399327, 0.51971385, 0.42075315, 0.65919112])
>>> b
array([0.42685801, 0.52210862, 0.52210862, 0.52210862])
>>> a*b
array([0.14683628, 0.27134708, 0.21967885, 0.34416937])
>>> np.sum(a*b)
0.9820315790772367
>>> a.dot(np.transpose(b))
0.9820315790772366
>>> a.dot(b.T)
0.9820315790772366
>>> a.dot(b)
0.9820315790772366
Pandas
Combine multiple rows of lists into one big list
lst = df['col_of_lists'].explode()
Make Pandas DataFrame apply() use all cores ?
import multiprocessing as mp
with mp.Pool(mp.cpu_count()) as pool:
df['newcol'] = pool.map(f, df['col'])