Python

    Hello Summary

    CONSTANT=10
    
    # recursive, exception, conditions, lambda
    def fib(n):
      if n < 0:
        raise Exception('n={}, should be positive'.format(n))
    
      if n == 0:
        return 0
    
      check_n = lambda n: n == 1 or n == 2
    
      if check_n(n):
        return 1
    
      return fib(n-1) + fib(n-2)
    
    # generator
    def gen(n):
      yield 1
      for i in range(5, n):
        yield n
    
    # class, attribute, method
    class math:
      def __init__(self, c=CONSTANT):
        self.c = c
    
      @staticmethod
      def addition(a, b=2):
        return a+b
    
      def times_ten_and_hundred(n):
        return n*self.c, n*self.c**c
    
    if __name__ == '__main__':
      # variables
      a, b = 1, 3
    
      # list/array(l), tuple(t), dict(d)
      l, t, d= [a, b], (a, b), {'a': a, 'b': b}
    
      # generator
      g = gen(6)
      # next(g) is equivalent to call g.__next__()
      print(next(g)) # 1 
      print(next(g)) # 5
      print(next(g)) # StopIteration
    
      try:
        fib(-1)
      except Exception as e:
        print("error: {}".format(e)) # error: n=-1, should be positive
    
      print(math.addition(a, b=b)) # 4
      print(math.addition(d['a'])) # 3
    
      # instance
      r1, r2 = math().times_ten_and_hundred(5)
      assert r1 == 50 # ok
      assert r2 == 500 # ok
    

    Data Structure

    List

    >>> a = []
    >>> a.append(1)
    >>> a.append(2)
    >>> a.append(3)
    >>> a.pop() # stack <=> lifo
    3
    >>> a.pop(0) # queue <=> fifo (if implemented on linkedlist, complexity O(1) to add/del element at beginning)
    1
    >>> a
    [2]
    

    Note: a list might have different type of elements (int, dict, ...), while import array cannot.

    Linked List

    See linked list

    Sorting algorithms

    See sorting

    Exceptions

    class Custom(Exception):
      pass
    
    try:
      print(does_not_exists)
    except Exception as e:
      raise Custom("Your variable does not exists") from e 
    

    Typing

    from __future__ import annotations
    
    def concat(a: list | str, b: list | str):
      return a + b 
    
    >>> concat("hello", " world")
    'hello world'
    >>> concat([1, 2], [3, 4])
    [1, 2, 3, 4]
    

    See pep-0604

    Concurrency Compute (thread/process)

    See concurrent.futures

    Example 1 (customed)

    from concurrent import futures
    import time
    
    def task(seq):
        print('Start:', time.strftime('%X'))
        time.sleep(max(seq))
        print('End:', time.strftime('%X'))
        return list(reversed(seq))
    
    def main():
        max_workers = 2
        timeout = 300
    
        jobs = []
        not_done = set()
        pendings = 0
    
        with futures.ProcessPoolExecutor(max_workers=max_workers) as executor: # you can also use futures.ThreadPoolExecutor
            for seq in [[3,2,1], [10,5,0]]:
                if pendings > max_workers:
                    _u, not_done = futures.wait(not_done, timeout, return_when=futures.FIRST_COMPLETED)
                    pendings = len(not_done)
                    print(f"{pendings} jobs pending")
                job = executor.submit(task, seq)
                jobs.append(job)
                not_done.add(job)
                pendings += 1
    
        done, not_done = futures.wait(jobs, timeout, return_when=futures.ALL_COMPLETED)
        results = [d.result() for d in done]
        return results
    
    >>> r = main()
    Start: 15:08:39
    End: 15:08:49
    Start: 15:08:49
    End: 15:08:52
    
    >>> r
    [[1, 2, 3], [0, 5, 10]]
    

    Example 2 (shorten)

    from concurrent import futures
    import time
    
    def task(seq):
      print('Start:', time.strftime('%X'))
      time.sleep(max(seq))
      print('End:', time.strftime('%X'))
      return list(reversed(seq))
    
    def main():
      results = []
      with futures.ThreadPoolExecutor(max_workers=2) as executor:
        tasks = {executor.submit(task, seq=seq) for seq in [[3,2,1],[10,5,0]]}
        for completed_task in futures.as_completed(tasks, timeout=60):
          results.append(completed_task.result())
      return results
    
    >>> r=main()
    Start: 12:30:36
    Start: 12:30:36
    End: 12:30:39
    End: 12:30:46
    
    >>> r
    [[1, 2, 3], [0, 5, 10]]
    

    Concurrency IO (Async)

    Useful when you write/read on disk/network devices (e.g. tcp connections)

    Read also:

    Classic usage

    import asyncio
    import time
    
    async def coroutine(seq):
    	await asyncio.sleep(max(seq))
    	return list(reversed(seq))
    
    async def main():
    	t = asyncio.create_task(coroutine([3, 2, 1]))
    	t2 = asyncio.create_task(coroutine([10, 5, 0]))  # Python 3.7+
    	print('Start:', time.strftime('%X'))
    	a1 = await asyncio.gather(t, t2)
    	print('End:', time.strftime('%X'))  # Should be 10 seconds
    	print(f'Both tasks done: {all((t.done(), t2.done()))}')
    	print('Start:', time.strftime('%X'))
    	a2 = await asyncio.gather(coroutine([1,2,3]), coroutine([1,2,2]))
    	print('End:', time.strftime('%X'))  # Should be 3 seconds
    	return a1, a2
    

    Note: don't use time.sleep()

    >>> asyncio.run(main())
    Start: 15:08:39
    End: 15:08:49
    Both tasks done: True
    Start: 15:08:49
    End: 15:08:52
    
    >>> a1
    [[1, 2, 3], [0, 5, 10]]
    

    source

    Run lot of asynchronous functions with eventloop

    import asyncio
    import time
    
    loop = asyncio.new_event_loop() # or loop = asyncio.get_running_loop()
    
    # by default it will create threads, but you can create processes by changing:
    # import concurrent.futures
    # loop.set_default_executor(concurrent.futures.ProcessPoolExecutor())
    # you can also use: 
    # with concurrent.futures.ProcessPoolExecutor() as pool_executor:
    #   loop.run_in_executor(pool_executor, func, params)
    
    async def send_request(id):
      while not loop.is_closed():
        print(time.strftime('%X'), ' Hello', id)
        await asyncio.sleep(id)  # non-blocking, so other threads can take the lead, see https://stackoverflow.com/questions/56729764/asyncio-sleep-vs-time-sleep
        print(time.strftime('%X'), ' Done', id)
        # it is possible to remove the while not loop... by doing so, but it is more costly (destroy/recreate thread), moreover it might be dangerous that a dying thread create a new one ?:
        # loop.create_task(send_request(id))  
    
    for i in range(1,32):
        loop.create_task(send_request(i))
    
    try:
      loop.run_forever()
    except KeyboardInterrupt:
      [task.cancel() for task in asyncio.all_tasks(loop)]
    finally:
      loop.close()
    

    Result:

    11:40:26  Hello 1
    11:40:26  Hello 2
    11:40:26  Hello 3
    11:40:26  Hello 4
    11:40:26  Hello 5
    11:40:26  Hello 6
    ...
    11:40:26  Hello 30
    11:40:26  Hello 31
    11:40:27  Done 1
    11:40:27  Hello 1
    11:40:28  Done 2
    11:40:28  Done 1
    11:40:28  Hello 2
    11:40:28  Hello 1
    11:40:29  Done 3
    11:40:29  Hello 3
    ...
    

    Run synchronous functions in different threads

    In case you can't touch a function (signature, content), but you want to run it in a thread or process to continue other tasks in parallel. Careful, if something is blocking in the function, you can't touch it, it will monopolize the thread. Might be problematic if you have more calls than available threads.

    import asyncio
    import time
    import concurrent
    
    def synchronous(i):
      print(time.strftime('%X'), ' Hello', i)
      time.sleep(i) # simulate work that cant be predictable and/or changed
      print(time.strftime('%X'), ' Done', i)
    
    async def sync_wrapper(i):
      loop = asyncio.get_running_loop()
    
      while True:  # optional: in this example we consider each sync fonction is recalled
        # NOTE it seems to work as well if executor=None, however
        # i'm not sure if it really spawn a thread with executor=None
        with concurrent.futures.ThreadPoolExecutor() as executor:
          await loop.run_in_executor(executor, synchronous, i)
    
    loop = asyncio.new_event_loop() # or loop = asyncio.get_running_loop()
    
    for i in range(1,4):
      loop.create_task(sync_wrapper(i))
    
    try:
      loop.run_forever()
    finally:
      loop.close()
    

    Result:

    19:01:23  Hello 1
    19:01:23  Hello 2
    19:01:23  Hello 3
    19:01:24  Done 1
    19:01:24  Hello 1
    19:01:25  Done 2
    19:01:25  Hello 2
    19:01:25  Done 1
    19:01:25  Hello 1
    19:01:26  Done 3
    19:01:26  Hello 3
    19:01:26  Done 1
    19:01:26  Hello 1
    19:01:27  Done 2
    19:01:27  Hello 2
    19:01:27  Done 1
    
    Alternative with python 3.9
    asyncio.to_thread(blocking)
    

    source

    Coupled with generators

    # generator async
    async def coroutine_generator(n):
      for i in range(n):
        yield i*i
    
    # usage of generator async
    async def print_gen(n):
      # generator coroutine will use `__aiter__() and __anext__()`
      async for i in coroutine_generator(n):
        print(i)
    
    await print_gen(5) # 0 1 4 9 16
    # alternatively you can do: asyncio.run(print_gen(5))
    

    See:

    Libraries

    Machine Learning

    • tslearn: time series analysis
    • scikit-learn: classical machine learning
    • sktime: time series model (classification, clustering) compatible with scikit-learn interfaces
    • kats: a kit to analyze time series data, key statistics and characteristics, detecting change points and anomalies, to forecasting future trends.
    • tsfresh: time series feature extraction
    • DESlib: easy-to-use ensemble learning library focused on the implementation of the state-of-the-art techniques for dynamic classifier and ensemble selection
    • PyOD: outlier detection
    • river: online machine learning with streaming data
    • unionml: deploy machine learning microservices
    • Flyte: workflow automation platform for complex, mission-critical data, and ML processes at scale
    • ALEPython: Accumulated Local Effects (or ALE) interepretability

    Statistics

    Misc

    • modin: scale pandas (using dask or ray)
    • polars: alternative to pandas written in rust
    • streamlit: create web apps for machine learning projects
    • luma.lcd: display drivers for HD44780, PCD8544, ST7735, ST7789, ST7567, HT1621, UC1701X, ILI9341
    • ruptures: time series change point detection in Python
    • SymPy: algebra system
    • Adafruit CircuitPython RFM69: CircuitPython RFM69 packet radio module. This supports basic RadioHead-compatible sending and receiving of packets with RFM69 series radios (433/915Mhz).
    • aiohttp: Asynchronous HTTP client/server framework for asyncio (core dev of cython)
    • aiocache: Asyncio cache manager for redis, memcached and memory

    Numpy

    >>> a
    array([0.34399327, 0.51971385, 0.42075315, 0.65919112])
    >>> b
    array([0.42685801, 0.52210862, 0.52210862, 0.52210862])
    >>> a*b
    array([0.14683628, 0.27134708, 0.21967885, 0.34416937])
    >>> np.sum(a*b)
    0.9820315790772367
    >>> a.dot(np.transpose(b))
    0.9820315790772366
    >>> a.dot(b.T)
    0.9820315790772366
    >>> a.dot(b)
    0.9820315790772366
    

    Pandas

    Combine multiple rows of lists into one big list

    lst = df['col_of_lists'].explode()
    

    source

    Make Pandas DataFrame apply() use all cores ?

    import multiprocessing as mp
    
    with mp.Pool(mp.cpu_count()) as pool:
        df['newcol'] = pool.map(f, df['col'])
    

    source

    FAQ