from collections.abc import Sequence
import random
from typing import Optional, List, Tuple

import gradio as gr
import spaces
import torch
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    BayesianDetectorModel,
    SynthIDTextWatermarkingConfig,
    SynthIDTextWatermarkDetector,
    SynthIDTextWatermarkLogitsProcessor,
)

# If the watewrmark is not detected, consider the use case. Could be because of
# the nature of the task (e.g., fatcual responses are lower entropy) or it could
# be another

_MODEL_IDENTIFIER = 'google/gemma-2b-it'
_DETECTOR_IDENTIFIER = 'google/synthid-spaces-demo-detector'

_PROMPTS: Tuple[str] = (
    'Write an essay about my pets, a cat named Mika and a dog named Cleo.',
    '',
    '',
)

_TORCH_DEVICE = (
    torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")
)
_ANSWERS: List[Tuple[str, str]] = []

_WATERMARK_CONFIG_DICT = dict(
    ngram_len=5,
    keys=[
        654,
        400,
        836,
        123,
        340,
        443,
        597,
        160,
        57,
        29,
        590,
        639,
        13,
        715,
        468,
        990,
        966,
        226,
        324,
        585,
        118,
        504,
        421,
        521,
        129,
        669,
        732,
        225,
        90,
        960,
    ],
    sampling_table_size=2**16,
    sampling_table_seed=0,
    context_history_size=1024,
)

_WATERMARK_CONFIG = SynthIDTextWatermarkingConfig(
    **_WATERMARK_CONFIG_DICT
)

tokenizer = AutoTokenizer.from_pretrained(
    _MODEL_IDENTIFIER, padding_side="left"
)
tokenizer.pad_token_id = tokenizer.eos_token_id

model = AutoModelForCausalLM.from_pretrained(_MODEL_IDENTIFIER)
model.to(_TORCH_DEVICE)

logits_processor = SynthIDTextWatermarkLogitsProcessor(
    **_WATERMARK_CONFIG_DICT,
    device=_TORCH_DEVICE,
)

detector_module = BayesianDetectorModel.from_pretrained(_DETECTOR_IDENTIFIER)
detector_module.to(_TORCH_DEVICE)

detector = SynthIDTextWatermarkDetector(
    detector_module=detector_module,
    logits_processor=logits_processor,
    tokenizer=tokenizer,
)


@spaces.GPU
def generate_outputs(
    prompts: Sequence[str],
    watermarking_config: Optional[SynthIDTextWatermarkingConfig] = None,
) -> Tuple[Sequence[str], torch.Tensor]:
  tokenized_prompts = tokenizer(
      prompts, return_tensors='pt', padding="longest"
  ).to(_TORCH_DEVICE)
  input_length = tokenized_prompts.input_ids.shape[1]
  output_sequences = model.generate(
      **tokenized_prompts,
      watermarking_config=watermarking_config,
      do_sample=True,
      max_length=500,
      top_k=40,
  )
  output_sequences = output_sequences[:, input_length:]
  detections = detector(output_sequences)
  return (
      tokenizer.batch_decode(output_sequences, skip_special_tokens=True),
      detections
  )


with gr.Blocks() as demo:
  gr.Markdown(
    '''
    # Using SynthID Text in your Generative AI projects

    [SynthID][synthid] is a Google DeepMind technology that watermarks and
    identifies AI-generated content by embedding digital watermarks directly
    into AI-generated images, audio, text or video.

    SynthID Text is an open source implementation of this technology available
    in Hugging Face Transformers that has two major components:

    *   A [logits processor][synthid-hf-logits-processor] that is
        [configured][synthid-hf-config] on a per-model basis and activated when
        calling `.generate()`; and
    *   A [detector][synthid-hf-detector] trained to recognized watermarked text
        generated by a specific model with a specific configuraiton.

    This Space demonstrates:

    1.  How to use SynthID Text to apply a watermark to text generated by your
        model; and
    1.  How to identify that text using a ready-made detector.

    Note that this detector is trained specifically for this demonstration. You
    should maintain a specific watermarking configuration for every model you
    use and protect that configuration as you would any other secret. See the
    [end-to-end guide][synthid-hf-detector-e2e] for more on training your own
    detectors, and the [SynthID Text documentation][raitk-synthid] for more on
    how this technology works.

    ## Applying a watermark

    Practically speaking, SynthID Text is a logits processor, applied to your
    model's generation pipeline after [Top-K and Top-P][cloud-parameter-values],
    that augments the model's logits using a pseudorandom _g_-function to encode
    watermarking information in a way that balances generation quality with
    watermark detectability. See the [paper][synthid-nature] for a complete
    technical description of the algorithm and analyses of how different
    configuration values affect performance.

    Watermarks are [configured][synthid-hf-config] to parameterize the
    _g_-function and how it is applied during generation. The following
    configuration is used for all demos. It should not be used for any
    production purposes.

    ```json
    {
        "ngram_len": 5,
        "keys": [
            654, 400, 836, 123, 340, 443, 597, 160,  57,  29,
            590, 639,  13, 715, 468, 990, 966, 226, 324, 585,
            118, 504, 421, 521, 129, 669, 732, 225,  90, 960
        ],
        "sampling_table_size": 65536,
        "sampling_table_seed": 0,
        "context_history_size": 1024
    }
    ```

    Watermarks are applied by initializing a `SynthIDTextWatermarkingConfig`
    and passing that as the `watermarking_config=` parameter in your call to
    `.generate()`, as shown in the snippet below.

    ```python
    from transformers import (
        AutoModelForCausalLM,
        AutoTokenizer,
        SynthIDTextWatermarkingConfig,
    )

    # Standard model and tokenizer initialization
    tokenizer = AutoTokenizer.from_pretrained('repo/id')
    model = AutoModelForCausalLM.from_pretrained('repo/id')

    # SynthID Text configuration
    watermarking_config = SynthIDTextWatermarkingConfig(...)

    # Generation with watermarking
    tokenized_prompts = tokenizer(["your prompts here"])
    output_sequences = model.generate(
        **tokenized_prompts,
        watermarking_config=watermarking_config,
        do_sample=True,
    )
    watermarked_text = tokenizer.batch_decode(output_sequences)
    ```

    ## Try it yourself.

    Lets use [Gemma 2B IT][gemma] to help you understand how watermarking works.

    Using the text boxes below enter up to three prompts then click the generate
    button. An example is provided to help get you started, but the cells are
    fully editable.

    Gemma will then generate watermarked and non-watermarked responses for each
    non-empty prompt you provided.

    [cloud-parameter-values]: https://cloud.google.com/vertex-ai/generative-ai/docs/learn/prompts/adjust-parameter-values
    [gemma]: https://huggingface.co/google/gemma-2b
    [raitk-synthid]: https://ai.google.dev/responsible/docs/safeguards/synthid
    [synthid]: https://deepmind.google/technologies/synthid/
    [synthid-hf-config]: https://huggingface.co/docs/transformers/v4.46.0/en/internal/generation_utils#transformers.SynthIDTextWatermarkingConfig
    [synthid-hf-detector]: https://huggingface.co/docs/transformers/v4.46.0/en/internal/generation_utils#transformers.BayesianDetectorModel
    [synthid-hf-detector-e2e]: https://github.com/huggingface/transformers/tree/v4.46.0/examples/research_projects/synthid_text/detector_training.py
    [synthid-hf-logits-processor]: https://huggingface.co/docs/transformers/v4.46.0/en/internal/generation_utils#transformers.SynthIDTextWatermarkLogitsProcessor
    [synthid-nature]: https://www.nature.com/articles/s41586-024-08025-4
    '''
  )
  prompt_inputs = [
      gr.Textbox(value=prompt, lines=4, label='Prompt')
      for prompt in _PROMPTS
  ]
  generate_btn = gr.Button('Generate')

  with gr.Column(visible=False) as generations_col:
    gr.Markdown(
      '''
      ## Human recognition of watermarked text

      The primary goal of SynthID Text is to apply a watermark to generated text
      without affecting generation quality. Another way to think about this is
      that generated text that carries a watermark should be imperceptible to
      you, the reader, but easily perceived by a watermark detector.

      The responses from Gemma are shown below. Use the checkboxes to mark which
      responses you think are the watermarked, then click the "reveal" button to
      see the true values.

      The [research paper][synthid-nature] has an in-depth study examining human
      perception of watermarked versus non-watermarked text.

      [synthid-nature]: https://www.nature.com/articles/s41586-024-08025-4
      '''
    )
    generations_grp = gr.CheckboxGroup(
        label='All generations, in random order',
        info='Select the generations you think are watermarked!',
    )
    reveal_btn = gr.Button('Reveal', visible=False)

  with gr.Column(visible=False) as detections_col:
    gr.Markdown(
      '''
      ## Detecting watermarked text

      The only way to properly detect watermarked text is with a trained
      classifier. This Space uses a pre-trained classifier hosted on Hugging Face
      Hub. For production uses you will need to train your own classifiers to
      recognize your watermarks. A [Bayesian detector][synthid-hf-detector] is
      provided in Transformers, along with an
      [end-to-end example][synthid-hf-detector-e2e] of how to train one of these
      detectors.

      You can see how your guesses compared to the actual results below. As
      above, the responses are displayed in checkboxes. If the box is checked,
      then the text carries a watermark. Your correct guesses are annotated with
      the "Correct" prefix.

      [synthid-hf-detector]: https://huggingface.co/docs/transformers/v4.46.0/en/internal/generation_utils#transformers.BayesianDetectorModel
      [synthid-hf-detector-e2e]: https://github.com/huggingface/transformers/tree/v4.46.0/examples/research_projects/synthid_text/detector_training.py
      '''
    )
    revealed_grp = gr.CheckboxGroup(
        label='Ground truth for all generations',
        info=(
            'Watermarked generations are checked, and your selection are '
            'marked as correct or incorrect in the text.'
        ),
    )
    gr.Markdown(
      '''
      ## Limitations

      SynthID Text watermarks are robust to some transformations, such as
      cropping pieces of text, modifying a few words, or mild paraphrasing, but
      this method does have limitations.

      - Watermark application is less effective on factual responses, as there
        is less opportunity to augment generation without decreasing accuracy.
      - Detector confidence scores can be greatly reduced when an AI-generated
        text is thoroughly rewritten, or translated to another language.

      SynthID Text is not built to directly stop motivated adversaries from
      causing harm. However, it can make it harder to use AI-generated content
      for malicious purposes, and it can be combined with other approaches to
      give better coverage across content types and platforms.
      '''
    )
    reset_btn = gr.Button('Reset', visible=False)

  def generate(*prompts):
    prompts = [p for p in prompts if p]
    standard, standard_detector = generate_outputs(prompts=prompts)
    watermarked, watermarked_detector = generate_outputs(
        prompts=prompts,
        watermarking_config=_WATERMARK_CONFIG,
    )
    upper_threshold = 0.9501
    lower_threshold = 0.1209

    def decision(score: float) -> str:
      if score > upper_threshold:
        return 'Watermarked'
      elif lower_threshold < score < upper_threshold:
        return 'Indeterminate'
      else:
        return 'Not watermarked'

    responses = [
        (text, decision(score))
        for text, score in zip(standard, standard_detector[0])
    ]
    responses += [
        (text, decision(score))
        for text, score in zip(watermarked, watermarked_detector[0])
    ]
    random.shuffle(responses)
    _ANSWERS.extend(responses)

    # Load model
    return {
        generate_btn: gr.Button(visible=False),
        generations_col: gr.Column(visible=True),
        generations_grp: gr.CheckboxGroup(
            [response[0] for response in responses],
        ),
        reveal_btn: gr.Button(visible=True),
    }

  generate_btn.click(
    lambda: gr.update(value='Generating...', interactive=False), None, generate_btn
  ).then(
     generate,
     inputs=prompt_inputs,
     outputs=[generate_btn, generations_col, generations_grp, reveal_btn]
  )

  def reveal(user_selections: list[str]):
    choices: list[str] = []
    value: list[str] = []

    for (response, decision) in _ANSWERS:
      if decision == "Watermarked":
        if response in user_selections:
          choice = f'Correct! {response}'
        else:
          choice = response

        value.append(choice)
      else:
        choice = response

      choices.append(choice)

    return {
        reveal_btn: gr.Button(visible=False),
        detections_col: gr.Column(visible=True),
        revealed_grp: gr.CheckboxGroup(choices=choices, value=value),
        reset_btn: gr.Button(visible=True),
    }

  reveal_btn.click(
    reveal,
    inputs=generations_grp,
    outputs=[
        reveal_btn,
        detections_col,
        revealed_grp,
        reset_btn
    ],
  )

  def reset():
    _ANSWERS.clear()

    return {
        generations_col: gr.Column(visible=False),
        detections_col: gr.Column(visible=False),
        revealed_grp: gr.CheckboxGroup(visible=False),
        reset_btn: gr.Button(visible=False),
        generate_btn: gr.Button(value='Generate', interactive=True, visible=True),
    }

  reset_btn.click(
    reset,
    inputs=[],
    outputs=[
        generations_col,
        detections_col,
        revealed_grp,
        reset_btn,
        generate_btn,
    ],
  )

if __name__ == '__main__':
  demo.launch()