Source

fs-async-iterable.class.ts

/**
 * This is util class not inteded to use directly, It used for iterate through dirents.
 */
export class FSAsyncIterable<T> implements AsyncIterable<T> {
  constructor(private readonly iterable: AsyncIterable<T>) {}

  [Symbol.asyncIterator]() {
    return this.iterable[Symbol.asyncIterator]();
  }

  /**
   * Transform input items
   * @param callback - async function (must return Promise),
   * which transform input item to output item
   * @param parallel - number of callbacks runs in parallel
   */
  map<P>(callback: (item: T) => Promise<P>, parallel: number = 1) {
    const iterable = this.iterable;
    const mapAsyncGenerator = async function*() {
      const pending = new Set<Promise<P>>();
      const finished = new Set<Promise<P>>();
      for await (const item of iterable) {
        const current = callback(item);
        pending.add(current);
        current
          .catch(() => {
            /* do nothing */
          })
          .finally(() => {
            pending.delete(current);
            finished.add(current);
          });
        if (pending.size >= parallel) await Promise.race(pending);

        for (const item of Array.from(finished)) {
          yield item;
          finished.delete(item);
        }
      }
      while (pending.size > 0) {
        await Promise.race(pending);
        for (const item of Array.from(finished)) {
          yield item;
          finished.delete(item);
        }
      }
      yield* finished;
    };
    return new FSAsyncIterable(mapAsyncGenerator());
  }

  /**
   * Pass items only when callback returns true.
   * @param callback - async function (must return Promise).
   * @param parallel - number of callbacks runs in parallel
   */
  filter(callback: (item: T) => Promise<boolean>, parallel: number = 1) {
    const iterable = this.iterable;
    const filterAsyncGenerator = async function*() {
      const pending = new Set<Promise<boolean>>();
      const finished = new Set<T>();
      for await (const item of iterable) {
        const current = callback(item);
        pending.add(current);
        current
          .then(res => {
            if (res) finished.add(item);
          })
          .catch(() => {
            /* do nothing */
          })
          .finally(() => {
            pending.delete(current);
          });
        if (pending.size >= parallel) await Promise.race(pending);

        for (const item of Array.from(finished)) {
          yield item;
          finished.delete(item);
        }
      }
      while (pending.size > 0) {
        await Promise.race(pending);
        for (const item of Array.from(finished)) {
          yield item;
          finished.delete(item);
        }
      }
      yield* finished;
    };
    return new FSAsyncIterable(filterAsyncGenerator());
  }

  /**
   * Calls the specified callback function for all the input elements.
   * The return value of the callback function is the accumulated result,
   * and is provided as an argument in the next call to the callback function.
   * @param callback A function that accepts up to four arguments.
   * The reduce method calls the callbackfn function one time for each element in the array.
   * Must return promise or be async.
   * @param initialValue initialValue is used as the initial value to start the accumulation.
   * The first call to the callbackfn function provides this value as an argument.
   */
  async reduce<U>(callback: (previousValue: U, currentValue: T) => Promise<U>, initialValue: U): Promise<U> {
    let previousValue = initialValue;
    for await (const item of this.iterable) {
      previousValue = await callback(previousValue, item);
    }
    return previousValue;
  }

  /**
   * The forEach() method executes a provided function once for each array element
   * @param callback - async function (must return Promise).
   * @param parallel - max items, that can be processed at same time. Optional parameter, equals 1 by default
   */
  async forEach(callback: (item: T) => Promise<void>, parallel: number = 1) {
    const activePromises = new Set();
    for await (const item of this.iterable) {
      const currentPromise = callback(item);
      activePromises.add(currentPromise);
      currentPromise
        .catch(_ => {
          /* do nothing */
        })
        .finally(() => {
          activePromises.delete(currentPromise);
        });

      if (activePromises.size >= parallel) await Promise.race(activePromises);
    }
    await Promise.all(activePromises);
  }
  /**
   *The toArray() method collect itertor items to array.
   */
  async toArray() {
    return await this.reduce(async (acc, item) => {
      acc.push(item);
      return acc;
    }, [] as T[]);
  }
}